diff --git a/config.py b/config.py
index 0caa04c..5529fc8 100644
--- a/config.py
+++ b/config.py
@@ -314,6 +314,9 @@ def reload_config() -> dict:
global S2A_API_BASE, S2A_ADMIN_KEY, S2A_ADMIN_TOKEN
global S2A_CONCURRENCY, S2A_PRIORITY, S2A_GROUP_NAMES, S2A_GROUP_IDS, S2A_API_MODE
global CONCURRENT_ENABLED, CONCURRENT_WORKERS
+ global SCHEDULER_ENABLED, SCHEDULER_START_HOUR, SCHEDULER_END_HOUR
+ global SCHEDULER_BATCH_SIZE, SCHEDULER_COOLDOWN_MINUTES, SCHEDULER_OUTPUT_TYPE
+ global SCHEDULER_MAX_CONSECUTIVE_FAILURES
result = {
"success": True,
@@ -389,6 +392,16 @@ def reload_config() -> dict:
S2A_GROUP_IDS = _s2a.get("group_ids", [])
S2A_API_MODE = _s2a.get("api_mode", False)
+ # 定时调度器配置
+ _scheduler = _cfg.get("scheduler", {})
+ SCHEDULER_ENABLED = _scheduler.get("enabled", False)
+ SCHEDULER_START_HOUR = _scheduler.get("start_hour", 8)
+ SCHEDULER_END_HOUR = _scheduler.get("end_hour", 14)
+ SCHEDULER_BATCH_SIZE = _scheduler.get("batch_size", 50)
+ SCHEDULER_COOLDOWN_MINUTES = _scheduler.get("cooldown_minutes", 5)
+ SCHEDULER_OUTPUT_TYPE = _scheduler.get("output_type", "team")
+ SCHEDULER_MAX_CONSECUTIVE_FAILURES = _scheduler.get("max_consecutive_failures", 3)
+
except Exception as e:
errors.append(f"config.toml: {e}")
@@ -677,6 +690,16 @@ TELEGRAM_NOTIFY_ON_ERROR = _telegram.get("notify_on_error", True)
TELEGRAM_CHECK_INTERVAL = _telegram.get("check_interval", 3600) # 默认1小时检查一次
TELEGRAM_LOW_STOCK_THRESHOLD = _telegram.get("low_stock_threshold", 10) # 低库存阈值
+# 定时调度器配置
+_scheduler = _cfg.get("scheduler", {})
+SCHEDULER_ENABLED = _scheduler.get("enabled", False) # 是否启用定时调度
+SCHEDULER_START_HOUR = _scheduler.get("start_hour", 8) # 开始时间 (小时, 0-23)
+SCHEDULER_END_HOUR = _scheduler.get("end_hour", 14) # 结束时间 (小时, 0-23)
+SCHEDULER_BATCH_SIZE = _scheduler.get("batch_size", 50) # 每轮注册数量
+SCHEDULER_COOLDOWN_MINUTES = _scheduler.get("cooldown_minutes", 5) # 轮次间冷却 (分钟)
+SCHEDULER_OUTPUT_TYPE = _scheduler.get("output_type", "team") # 输出方式: team / json
+SCHEDULER_MAX_CONSECUTIVE_FAILURES = _scheduler.get("max_consecutive_failures", 3) # 连续失败N轮后暂停
+
# 代理
# 注意: proxy_enabled 和 proxies 可能在顶层或被误放在 browser section 下
_proxy_enabled_top = _cfg.get("proxy_enabled")
diff --git a/config.toml.example b/config.toml.example
index e4c7ce0..462939c 100644
--- a/config.toml.example
+++ b/config.toml.example
@@ -235,6 +235,24 @@ check_interval = 3600
# 低库存预警阈值 (正常账号数低于此值时预警)
low_stock_threshold = 10
+# ==================== 定时调度器配置 ====================
+# 时间窗口内自动循环执行: 注册 → run_all → 冷却 → 重复
+# 通过 Telegram Bot 的 /schedule 命令开启/关闭
+[scheduler]
+# 是否启用定时调度 (也可通过 /schedule on 命令开启)
+enabled = false
+# 时间窗口: 仅在此时间段内运行 (24小时制)
+start_hour = 8
+end_hour = 14
+# 每轮注册的 GPT Team 账号数量
+batch_size = 50
+# 每轮完成后的冷却时间 (分钟)
+cooldown_minutes = 5
+# 注册输出方式: "team" (写入 team.json 供 run_all 处理)
+output_type = "team"
+# 连续失败 N 轮后自动暂停调度器并发送告警
+max_consecutive_failures = 3
+
# ==================== AutoGPTPlus 配置 ====================
# 独立的 ChatGPT 订阅自动化脚本配置
[autogptplus]
diff --git a/telegram_bot.py b/telegram_bot.py
index cdb25ab..90e9113 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -19,6 +19,11 @@ from telegram.ext import (
ContextTypes,
)
+from datetime import datetime, timedelta, timezone, time as dt_time
+
+# 北京时间 UTC+8
+BEIJING_TZ = timezone(timedelta(hours=8))
+
from config import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ADMIN_CHAT_IDS,
@@ -51,6 +56,13 @@ from config import (
S2A_API_MODE,
BROWSER_RANDOM_FINGERPRINT,
batch_remove_teams_by_names,
+ SCHEDULER_ENABLED,
+ SCHEDULER_START_HOUR,
+ SCHEDULER_END_HOUR,
+ SCHEDULER_BATCH_SIZE,
+ SCHEDULER_COOLDOWN_MINUTES,
+ SCHEDULER_OUTPUT_TYPE,
+ SCHEDULER_MAX_CONSECUTIVE_FAILURES,
)
from utils import load_team_tracker, get_all_incomplete_accounts, save_team_tracker, get_completed_teams, batch_remove_completed_teams
from bot_notifier import BotNotifier, set_notifier, progress_finish
@@ -105,6 +117,20 @@ class ProvisionerBot:
"errors": []
}
self._import_batch_timeout_task = None # 超时任务
+ # ==================== 调度器状态 ====================
+ self._scheduler_active = False # 调度器是否正在运行
+ self._scheduler_task: Optional[asyncio.Task] = None # 调度器 asyncio.Task
+ self._scheduler_stop_event = asyncio.Event() # 用于通知调度器停止
+ self._scheduler_round = 0 # 当前轮次
+ self._scheduler_stats = { # 累计统计
+ "total_rounds": 0,
+ "total_registered": 0,
+ "total_ingested": 0,
+ "total_failed": 0,
+ "consecutive_failures": 0,
+ "start_time": None,
+ }
+ self._scheduler_suspended_date = None # /stop 后挂起当天调度 (date 对象)
async def start(self):
"""启动 Bot"""
@@ -168,6 +194,9 @@ class ProvisionerBot:
("keys_usage", self.cmd_keys_usage),
("autogptplus", self.cmd_autogptplus),
("update_token", self.cmd_update_token),
+ ("schedule", self.cmd_schedule),
+ ("schedule_config", self.cmd_schedule_config),
+ ("schedule_status", self.cmd_schedule_status),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
@@ -228,6 +257,16 @@ class ProvisionerBot:
)
log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s")
+ # 注册定时调度器 (每天 start_hour 触发)
+ if SCHEDULER_ENABLED:
+ trigger_time = dt_time(hour=SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ)
+ self.app.job_queue.run_daily(
+ self._scheduler_daily_trigger,
+ time=trigger_time,
+ name="scheduler_daily"
+ )
+ log.info(f"Scheduler registered: daily at {SCHEDULER_START_HOUR:02d}:00 - {SCHEDULER_END_HOUR:02d}:00 (Beijing Time)")
+
# 启动通知器
await self.notifier.start()
@@ -1395,7 +1434,13 @@ class ProvisionerBot:
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""强制停止当前任务"""
- if not self.current_task or self.current_task.done():
+ # 如果调度器正在运行
+ scheduler_was_active = self._scheduler_active
+ if self._scheduler_active:
+ self._scheduler_stop_event.set()
+ self._scheduler_suspended_date = datetime.now().date() # 挂起今天的调度
+
+ if (not self.current_task or self.current_task.done()) and not scheduler_was_active:
await update.message.reply_text("📭 当前没有运行中的任务")
return
@@ -1477,6 +1522,16 @@ class ProvisionerBot:
await update.message.reply_text("\n".join(report_lines), parse_mode="HTML")
+ # 如果调度器被停止,额外通知
+ if scheduler_was_active:
+ await update.message.reply_text(
+ "⏰ 调度器已停止\n\n"
+ "调度器已禁用,不会继续运行\n"
+ "明天 08:00 将自动重新启动\n\n"
+ "如需手动恢复: /schedule on",
+ parse_mode="HTML"
+ )
+
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")
@@ -6263,6 +6318,669 @@ class ProvisionerBot:
except Exception as e:
await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}")
+ # ==================== 定时调度器 ====================
+
+ @admin_only
+ async def cmd_schedule(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """开启/关闭定时调度器"""
+ import config as cfg
+
+ args = context.args
+ if not args:
+ # 显示当前状态
+ status = "✅ 运行中" if self._scheduler_active else ("⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 已关闭")
+ text = (
+ f"⏰ 定时调度器\n\n"
+ f"状态: {status}\n"
+ f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
+ f"每轮注册: {cfg.SCHEDULER_BATCH_SIZE} 个\n"
+ f"冷却时间: {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟\n"
+ f"连续失败上限: {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES} 轮\n\n"
+ f"用法:\n"
+ f" /schedule on - 立即开启\n"
+ f" /schedule off - 关闭\n"
+ f" /schedule_config - 配置参数\n"
+ f" /schedule_status - 查看运行状态"
+ )
+ await update.message.reply_text(text, parse_mode="HTML")
+ return
+
+ action = args[0].lower()
+
+ if action == "on":
+ if self._scheduler_active:
+ await update.message.reply_text("⚠️ 调度器已经在运行中")
+ return
+
+ if self.current_task and not self.current_task.done():
+ await update.message.reply_text(
+ f"⚠️ 有任务正在运行: {self.current_team}\n"
+ "请先等待任务完成或使用 /stop 停止"
+ )
+ return
+
+ # 更新 config
+ cfg.SCHEDULER_ENABLED = True
+
+ # 检查是否在时间窗口内
+ now = datetime.now()
+ if cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR:
+ # 在时间窗口内,立即启动
+ self._start_scheduler()
+ await update.message.reply_text(
+ f"✅ 调度器已启动\n\n"
+ f"当前在时间窗口内,立即开始\n"
+ f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
+ f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟",
+ parse_mode="HTML"
+ )
+ else:
+ # 不在时间窗口内,注册 daily job
+ self._register_scheduler_daily_job()
+ await update.message.reply_text(
+ f"✅ 调度器已启用\n\n"
+ f"当前不在时间窗口内\n"
+ f"将在每天 {cfg.SCHEDULER_START_HOUR:02d}:00 自动启动\n"
+ f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00",
+ parse_mode="HTML"
+ )
+
+ elif action == "off":
+ cfg.SCHEDULER_ENABLED = False
+ if self._scheduler_active:
+ self._scheduler_stop_event.set()
+ await update.message.reply_text(
+ "🛑 调度器正在停止\n\n"
+ "当前轮次完成后将停止调度器\n"
+ "如需立即停止任务,请使用 /stop",
+ parse_mode="HTML"
+ )
+ else:
+ # 移除 daily job
+ self._remove_scheduler_daily_job()
+ await update.message.reply_text(
+ "❌ 调度器已关闭",
+ parse_mode="HTML"
+ )
+ else:
+ await update.message.reply_text(
+ "用法: /schedule on 或 /schedule off",
+ parse_mode="HTML"
+ )
+
+ @admin_only
+ async def cmd_schedule_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """配置调度器参数"""
+ import config as cfg
+ import tomli_w
+
+ args = context.args
+ if not args:
+ text = (
+ f"⚙️ 调度器配置\n\n"
+ f"start_hour = {cfg.SCHEDULER_START_HOUR} (开始时间)\n"
+ f"end_hour = {cfg.SCHEDULER_END_HOUR} (结束时间)\n"
+ f"batch_size = {cfg.SCHEDULER_BATCH_SIZE} (每轮注册数)\n"
+ f"cooldown = {cfg.SCHEDULER_COOLDOWN_MINUTES} (冷却分钟)\n"
+ f"max_failures = {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES} (连续失败上限)\n\n"
+ f"修改示例:\n"
+ f" /schedule_config batch_size 30\n"
+ f" /schedule_config start_hour 9\n"
+ f" /schedule_config cooldown 10"
+ )
+ await update.message.reply_text(text, parse_mode="HTML")
+ return
+
+ if len(args) < 2:
+ await update.message.reply_text("❌ 用法: /schedule_config 参数名 值", parse_mode="HTML")
+ return
+
+ key = args[0].lower()
+ try:
+ value = int(args[1])
+ except ValueError:
+ await update.message.reply_text("❌ 值必须为整数")
+ return
+
+ # 参数映射
+ config_map = {
+ "start_hour": ("start_hour", 0, 23),
+ "end_hour": ("end_hour", 0, 23),
+ "batch_size": ("batch_size", 1, 200),
+ "cooldown": ("cooldown_minutes", 1, 60),
+ "max_failures": ("max_consecutive_failures", 1, 20),
+ }
+
+ if key not in config_map:
+ valid_keys = ", ".join(config_map.keys())
+ await update.message.reply_text(f"❌ 无效参数。可选: {valid_keys}")
+ return
+
+ toml_key, min_val, max_val = config_map[key]
+ if not (min_val <= value <= max_val):
+ await update.message.reply_text(f"❌ 值必须在 {min_val}-{max_val} 之间")
+ return
+
+ # 更新内存中的配置
+ attr_name = f"SCHEDULER_{toml_key.upper()}"
+ old_value = getattr(cfg, attr_name)
+ setattr(cfg, attr_name, value)
+
+ # 持久化到 config.toml
+ try:
+ import tomllib
+ with open(CONFIG_FILE, "rb") as f:
+ config = tomllib.load(f)
+
+ if "scheduler" not in config:
+ config["scheduler"] = {}
+ config["scheduler"][toml_key] = value
+
+ with open(CONFIG_FILE, "wb") as f:
+ tomli_w.dump(config, f)
+
+ await update.message.reply_text(
+ f"✅ 配置已更新\n\n"
+ f"{key}: {old_value} → {value}\n\n"
+ f"💡 如调度器正在运行,下轮生效",
+ parse_mode="HTML"
+ )
+ except ImportError:
+ await update.message.reply_text(
+ f"⚠️ 内存配置已更新 ({key}: {old_value} → {value})\n"
+ "但无法持久化: 缺少 tomli_w 依赖"
+ )
+ except Exception as e:
+ await update.message.reply_text(
+ f"⚠️ 内存配置已更新 ({key}: {old_value} → {value})\n"
+ f"但持久化失败: {e}"
+ )
+
+ @admin_only
+ async def cmd_schedule_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """查看调度器运行状态"""
+ import config as cfg
+
+ if not self._scheduler_active:
+ status = "⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 未启用"
+ await update.message.reply_text(
+ f"📊 调度器状态\n\n"
+ f"状态: {status}\n\n"
+ f"使用 /schedule on 启动",
+ parse_mode="HTML"
+ )
+ return
+
+ stats = self._scheduler_stats
+ elapsed = ""
+ if stats["start_time"]:
+ delta = datetime.now() - stats["start_time"]
+ hours, remainder = divmod(int(delta.total_seconds()), 3600)
+ minutes, seconds = divmod(remainder, 60)
+ elapsed = f"{hours}h{minutes}m{seconds}s"
+
+ text = (
+ f"📊 调度器运行状态\n\n"
+ f"🟢 运行中 | 第 {self._scheduler_round} 轮\n"
+ f"⏱️ 已运行: {elapsed}\n"
+ f"⏰ 窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n\n"
+ f"累计统计:\n"
+ f" 📝 注册: {stats['total_registered']} 个\n"
+ f" 📥 入库: {stats['total_ingested']} 个\n"
+ f" ❌ 失败轮次: {stats['total_failed']}\n"
+ f" 🔄 总轮次: {stats['total_rounds']}\n"
+ f" ⚠️ 连续失败: {stats['consecutive_failures']}/{cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES}"
+ )
+
+ if self.current_team:
+ text += f"\n\n🏃 当前任务: {self.current_team}"
+
+ await update.message.reply_text(text, parse_mode="HTML")
+
+ def _register_scheduler_daily_job(self):
+ """注册每日定时触发 Job"""
+ import config as cfg
+
+ # 先移除旧的 job
+ self._remove_scheduler_daily_job()
+
+ if self.app and self.app.job_queue:
+ trigger_time = dt_time(hour=cfg.SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ)
+ self.app.job_queue.run_daily(
+ self._scheduler_daily_trigger,
+ time=trigger_time,
+ name="scheduler_daily"
+ )
+ log.info(f"Scheduler daily job registered at {cfg.SCHEDULER_START_HOUR:02d}:00 (Beijing Time)")
+
+ def _remove_scheduler_daily_job(self):
+ """移除每日定时触发 Job"""
+ if self.app and self.app.job_queue:
+ jobs = self.app.job_queue.get_jobs_by_name("scheduler_daily")
+ for job in jobs:
+ job.schedule_removal()
+
+ async def _scheduler_daily_trigger(self, context: ContextTypes.DEFAULT_TYPE):
+ """每日定时触发回调 - 由 job_queue 调用"""
+ import config as cfg
+
+ if not cfg.SCHEDULER_ENABLED:
+ return
+
+ # 检查是否被 /stop 挂起了今天的调度
+ today = datetime.now().date()
+ if self._scheduler_suspended_date == today:
+ log.info(f"Scheduler suspended for today ({today}), skipping")
+ return
+
+ if self._scheduler_active:
+ log.info("Scheduler already active, skipping daily trigger")
+ return
+
+ if self.current_task and not self.current_task.done():
+ log.warning("Task already running, skipping scheduler trigger")
+ # 通知管理员
+ for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
+ try:
+ await context.bot.send_message(
+ chat_id,
+ f"⚠️ 调度器触发跳过\n\n"
+ f"当前有任务在运行: {self.current_team}\n"
+ f"请任务完成后手动使用 /schedule on 启动",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+ return
+
+ # 启动调度器
+ self._start_scheduler()
+
+ # 通知管理员
+ for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
+ try:
+ await context.bot.send_message(
+ chat_id,
+ f"⏰ 定时调度器已启动\n\n"
+ f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
+ f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
+ def _start_scheduler(self):
+ """启动调度器循环"""
+ self._scheduler_active = True
+ self._scheduler_stop_event.clear()
+ self._scheduler_round = 0
+ self._scheduler_stats = {
+ "total_rounds": 0,
+ "total_registered": 0,
+ "total_ingested": 0,
+ "total_failed": 0,
+ "consecutive_failures": 0,
+ "start_time": datetime.now(),
+ }
+ self._scheduler_task = asyncio.create_task(self._scheduler_loop())
+
+ async def _scheduler_loop(self):
+ """调度器主循环: 注册 → run_all → 冷却 → 重复"""
+ import config as cfg
+
+ log.section("定时调度器启动")
+ chat_id = TELEGRAM_ADMIN_CHAT_IDS[0] if TELEGRAM_ADMIN_CHAT_IDS else None
+
+ try:
+ while not self._scheduler_stop_event.is_set():
+ # ===== 时间窗口检查 =====
+ now = datetime.now()
+ if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR):
+ log.info(f"超出时间窗口 ({cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00),调度结束")
+ break
+
+ self._scheduler_round += 1
+ round_num = self._scheduler_round
+ log.section(f"调度器 - 第 {round_num} 轮")
+
+ # ===== Phase 1: Provisioning (注册 GPT Team 账号) =====
+ log.info(f"[Phase 1] 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 GPT Team 账号...")
+
+ if chat_id:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ f"🔄 第 {round_num} 轮开始\n\n"
+ f"📝 Phase 1: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个账号...\n"
+ f"⏰ {now.strftime('%H:%M:%S')}",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
+ # 重置停止标志
+ try:
+ import run
+ run._shutdown_requested = False
+ except Exception:
+ pass
+
+ self.current_team = f"调度器-第{round_num}轮-注册"
+
+ # 执行注册
+ reg_success = 0
+ reg_fail = 0
+ try:
+ reg_success, reg_fail = await self._scheduler_run_registration(
+ chat_id, cfg.SCHEDULER_BATCH_SIZE, cfg.SCHEDULER_OUTPUT_TYPE
+ )
+ except Exception as e:
+ log.error(f"调度器注册阶段异常: {e}")
+ reg_fail = cfg.SCHEDULER_BATCH_SIZE
+
+ self._scheduler_stats["total_registered"] += reg_success
+
+ # 检查中断
+ if self._scheduler_stop_event.is_set():
+ break
+
+ # 注册全部失败检查
+ if reg_success == 0:
+ self._scheduler_stats["consecutive_failures"] += 1
+ self._scheduler_stats["total_failed"] += 1
+ log.warning(f"第 {round_num} 轮注册全部失败 (连续失败: {self._scheduler_stats['consecutive_failures']})")
+
+ if self._scheduler_stats["consecutive_failures"] >= cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES:
+ log.error(f"连续失败 {self._scheduler_stats['consecutive_failures']} 轮,调度器自动暂停")
+ if chat_id:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ f"🚨 调度器自动暂停\n\n"
+ f"连续 {self._scheduler_stats['consecutive_failures']} 轮注册全部失败\n"
+ f"请检查配置后使用 /schedule on 重新启动\n\n"
+ f"可能原因:\n"
+ f"• IBAN 已用完\n"
+ f"• 邮件 API 异常\n"
+ f"• 域名全部拉黑",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+ break
+ continue # 跳过 run_all,直接下一轮
+ else:
+ self._scheduler_stats["consecutive_failures"] = 0
+
+ # ===== Phase 1.5: Verify (验证 account_id) =====
+ log.info(f"[Phase 1.5] 验证注册账号 account_id...")
+
+ if chat_id:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ f"🔍 第 {round_num} 轮 - Phase 1.5\n\n"
+ f"📝 注册完成: ✅ {reg_success} / ❌ {reg_fail}\n"
+ f"🔍 正在验证 account_id...",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
+ self.current_team = f"调度器-第{round_num}轮-验证"
+
+ try:
+ await self._validate_and_cleanup_accounts(chat_id, force_all=True)
+ log.success("verify_all 完成")
+ except Exception as e:
+ log.error(f"调度器验证阶段异常: {e}")
+
+ # 检查中断
+ if self._scheduler_stop_event.is_set():
+ break
+
+ # ===== Phase 2: Ingestion (run_all) =====
+ log.info(f"[Phase 2] 执行 run_all 入库处理...")
+
+ if chat_id:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ f"🚀 第 {round_num} 轮 - Phase 2\n\n"
+ f"📝 注册: ✅ {reg_success} / ❌ {reg_fail}\n"
+ f"🔍 验证完成\n"
+ f"📥 开始 run_all 入库处理...",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
+ self.current_team = f"调度器-第{round_num}轮-run_all"
+
+ ingested = 0
+ try:
+ ingested = await self._scheduler_run_all()
+ except Exception as e:
+ log.error(f"调度器 run_all 阶段异常: {e}")
+
+ self._scheduler_stats["total_ingested"] += ingested
+ self._scheduler_stats["total_rounds"] += 1
+
+ # 检查中断
+ if self._scheduler_stop_event.is_set():
+ break
+
+ # ===== Graceful Boundary: 任务完成后检查时间 =====
+ now = datetime.now()
+ if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR):
+ log.info(f"run_all 完成后已超出时间窗口 ({now.strftime('%H:%M:%S')}),调度结束")
+ if chat_id:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ f"⏰ 第 {round_num} 轮完成后已超出时间窗口\n\n"
+ f"当前: {now.strftime('%H:%M:%S')}\n"
+ f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
+ f"本日调度结束",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+ break
+
+ # ===== Phase 3: Cooldown =====
+ cooldown = cfg.SCHEDULER_COOLDOWN_MINUTES * 60
+ log.info(f"[Phase 3] 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...")
+
+ if chat_id:
+ try:
+ next_time = now + timedelta(seconds=cooldown)
+ await self.app.bot.send_message(
+ chat_id,
+ f"⏳ 第 {round_num} 轮完成\n\n"
+ f"✅ 注册: {reg_success} | 📥 入库: {ingested}\n"
+ f"冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...\n"
+ f"下轮预计: {next_time.strftime('%H:%M:%S')}",
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
+ self.current_team = f"调度器-冷却中"
+
+ # 可中断的等待
+ try:
+ await asyncio.wait_for(
+ self._scheduler_stop_event.wait(),
+ timeout=cooldown
+ )
+ # 如果 wait 返回,说明收到了停止信号
+ break
+ except asyncio.TimeoutError:
+ # 正常超时,继续下一轮
+ pass
+
+ except Exception as e:
+ log.error(f"调度器异常退出: {e}")
+ finally:
+ self._scheduler_active = False
+ self.current_task = None
+ self.current_team = None
+ log.info("调度器已停止")
+
+ # 发送日报
+ await self._send_daily_report()
+
+ async def _scheduler_run_registration(self, chat_id: int, count: int, output_type: str) -> tuple:
+ """调度器: 执行注册阶段,返回 (success_count, fail_count)"""
+ from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode
+ from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS
+ import json
+ import threading
+
+ results = []
+ success_count = 0
+ fail_count = 0
+ results_lock = threading.Lock()
+
+ current_mode = get_register_mode()
+ workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
+ workers = min(workers, count)
+
+ task_queue = list(range(count))
+ queue_lock = threading.Lock()
+
+ def worker_task(worker_id: int):
+ nonlocal success_count, fail_count
+
+ while True:
+ try:
+ import run
+ if run._shutdown_requested:
+ break
+ except Exception:
+ pass
+
+ with queue_lock:
+ if not task_queue:
+ break
+ task_idx = task_queue.pop(0)
+
+ try:
+ result = run_single_registration_auto(
+ progress_callback=None,
+ step_callback=None
+ )
+
+ with results_lock:
+ if result.get("stopped"):
+ break
+ elif result.get("success"):
+ success_count += 1
+ results.append({
+ "account": result["account"],
+ "password": result["password"],
+ "token": result["token"],
+ "account_id": result.get("account_id", "")
+ })
+ else:
+ fail_count += 1
+ except Exception as e:
+ with results_lock:
+ fail_count += 1
+ log.error(f"Scheduler Worker {worker_id}: 注册异常: {e}")
+
+ cleanup_chrome_processes()
+
+ # 使用线程池并发执行
+ import concurrent.futures
+ loop = asyncio.get_event_loop()
+ with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
+ async_futures = [loop.run_in_executor(executor, worker_task, i) for i in range(workers)]
+ await asyncio.gather(*async_futures, return_exceptions=True)
+
+ # 保存结果到 team.json
+ if results and output_type == "team":
+ try:
+ team_file = TEAM_JSON_FILE
+ existing = []
+ if team_file.exists():
+ with open(team_file, "r", encoding="utf-8") as f:
+ existing = json.load(f)
+
+ existing.extend(results)
+
+ with open(team_file, "w", encoding="utf-8") as f:
+ json.dump(existing, f, ensure_ascii=False, indent=2)
+
+ reload_config()
+ log.success(f"调度器: {success_count} 个账号已写入 team.json (总计 {len(existing)})")
+ except Exception as e:
+ log.error(f"调度器: 保存 team.json 失败: {e}")
+
+ return (success_count, fail_count)
+
+ async def _scheduler_run_all(self) -> int:
+ """调度器: 执行 run_all,返回成功入库数"""
+ from run import run_all_teams
+ from team_service import preload_all_account_ids
+ from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
+ from config import DEFAULT_PASSWORD
+
+ loop = asyncio.get_event_loop()
+
+ def _task():
+ preload_all_account_ids()
+ _tracker = load_team_tracker()
+ add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
+ save_team_tracker(_tracker)
+ return run_all_teams()
+
+ results = await loop.run_in_executor(self.executor, _task)
+
+ # 自动清理 team.json 和 team_tracker.json
+ await self._auto_clean_after_run_all()
+
+ # 统计成功入库数
+ ingested = sum(1 for r in (results or []) if r.get("status") in ("success", "completed"))
+ return ingested
+
+ async def _send_daily_report(self):
+ """发送调度器日报"""
+ stats = self._scheduler_stats
+ if stats["total_rounds"] == 0 and stats["total_registered"] == 0:
+ return # 没有任何数据,不发送
+
+ elapsed = ""
+ if stats["start_time"]:
+ delta = datetime.now() - stats["start_time"]
+ hours, remainder = divmod(int(delta.total_seconds()), 3600)
+ minutes, _ = divmod(remainder, 60)
+ elapsed = f"{hours}h{minutes}m"
+
+ text = (
+ f"📊 调度器日报\n\n"
+ f"⏱️ 运行时长: {elapsed}\n"
+ f"🔄 总轮次: {stats['total_rounds']}\n\n"
+ f"累计结果:\n"
+ f" 📝 注册成功: {stats['total_registered']} 个\n"
+ f" 📥 入库成功: {stats['total_ingested']} 个\n"
+ f" ❌ 失败轮次: {stats['total_failed']}\n"
+ )
+
+ if stats["consecutive_failures"] >= 1:
+ text += f"\n⚠️ 最终连续失败: {stats['consecutive_failures']} 轮"
+
+ for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
+ try:
+ await self.app.bot.send_message(
+ chat_id,
+ text,
+ parse_mode="HTML"
+ )
+ except Exception:
+ pass
+
async def main():
"""主函数"""