feat: add configurable timed scheduler to the Telegram bot for automated tasks.

This commit is contained in:
2026-02-10 02:38:45 +08:00
parent b9dd421714
commit 4d5fa36183
3 changed files with 760 additions and 1 deletions

View File

@@ -19,6 +19,11 @@ from telegram.ext import (
ContextTypes,
)
from datetime import datetime, timedelta, timezone, time as dt_time
# 北京时间 UTC+8
BEIJING_TZ = timezone(timedelta(hours=8))
from config import (
TELEGRAM_BOT_TOKEN,
TELEGRAM_ADMIN_CHAT_IDS,
@@ -51,6 +56,13 @@ from config import (
S2A_API_MODE,
BROWSER_RANDOM_FINGERPRINT,
batch_remove_teams_by_names,
SCHEDULER_ENABLED,
SCHEDULER_START_HOUR,
SCHEDULER_END_HOUR,
SCHEDULER_BATCH_SIZE,
SCHEDULER_COOLDOWN_MINUTES,
SCHEDULER_OUTPUT_TYPE,
SCHEDULER_MAX_CONSECUTIVE_FAILURES,
)
from utils import load_team_tracker, get_all_incomplete_accounts, save_team_tracker, get_completed_teams, batch_remove_completed_teams
from bot_notifier import BotNotifier, set_notifier, progress_finish
@@ -105,6 +117,20 @@ class ProvisionerBot:
"errors": []
}
self._import_batch_timeout_task = None # 超时任务
# ==================== 调度器状态 ====================
self._scheduler_active = False # 调度器是否正在运行
self._scheduler_task: Optional[asyncio.Task] = None # 调度器 asyncio.Task
self._scheduler_stop_event = asyncio.Event() # 用于通知调度器停止
self._scheduler_round = 0 # 当前轮次
self._scheduler_stats = { # 累计统计
"total_rounds": 0,
"total_registered": 0,
"total_ingested": 0,
"total_failed": 0,
"consecutive_failures": 0,
"start_time": None,
}
self._scheduler_suspended_date = None # /stop 后挂起当天调度 (date 对象)
async def start(self):
"""启动 Bot"""
@@ -168,6 +194,9 @@ class ProvisionerBot:
("keys_usage", self.cmd_keys_usage),
("autogptplus", self.cmd_autogptplus),
("update_token", self.cmd_update_token),
("schedule", self.cmd_schedule),
("schedule_config", self.cmd_schedule_config),
("schedule_status", self.cmd_schedule_status),
]
for cmd, handler in handlers:
self.app.add_handler(CommandHandler(cmd, handler))
@@ -228,6 +257,16 @@ class ProvisionerBot:
)
log.info(f"Stock check scheduled every {TELEGRAM_CHECK_INTERVAL}s")
# 注册定时调度器 (每天 start_hour 触发)
if SCHEDULER_ENABLED:
trigger_time = dt_time(hour=SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ)
self.app.job_queue.run_daily(
self._scheduler_daily_trigger,
time=trigger_time,
name="scheduler_daily"
)
log.info(f"Scheduler registered: daily at {SCHEDULER_START_HOUR:02d}:00 - {SCHEDULER_END_HOUR:02d}:00 (Beijing Time)")
# 启动通知器
await self.notifier.start()
@@ -1395,7 +1434,13 @@ class ProvisionerBot:
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""强制停止当前任务"""
if not self.current_task or self.current_task.done():
# 如果调度器正在运行
scheduler_was_active = self._scheduler_active
if self._scheduler_active:
self._scheduler_stop_event.set()
self._scheduler_suspended_date = datetime.now().date() # 挂起今天的调度
if (not self.current_task or self.current_task.done()) and not scheduler_was_active:
await update.message.reply_text("📭 当前没有运行中的任务")
return
@@ -1477,6 +1522,16 @@ class ProvisionerBot:
await update.message.reply_text("\n".join(report_lines), parse_mode="HTML")
# 如果调度器被停止,额外通知
if scheduler_was_active:
await update.message.reply_text(
"<b>⏰ 调度器已停止</b>\n\n"
"调度器已禁用,不会继续运行\n"
"明天 08:00 将自动重新启动\n\n"
"如需手动恢复: <code>/schedule on</code>",
parse_mode="HTML"
)
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")
@@ -6263,6 +6318,669 @@ class ProvisionerBot:
except Exception as e:
await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}")
# ==================== 定时调度器 ====================
@admin_only
async def cmd_schedule(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""开启/关闭定时调度器"""
import config as cfg
args = context.args
if not args:
# 显示当前状态
status = "✅ 运行中" if self._scheduler_active else ("⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 已关闭")
text = (
f"<b>⏰ 定时调度器</b>\n\n"
f"状态: {status}\n"
f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
f"每轮注册: {cfg.SCHEDULER_BATCH_SIZE}\n"
f"冷却时间: {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟\n"
f"连续失败上限: {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES}\n\n"
f"用法:\n"
f" <code>/schedule on</code> - 立即开启\n"
f" <code>/schedule off</code> - 关闭\n"
f" <code>/schedule_config</code> - 配置参数\n"
f" <code>/schedule_status</code> - 查看运行状态"
)
await update.message.reply_text(text, parse_mode="HTML")
return
action = args[0].lower()
if action == "on":
if self._scheduler_active:
await update.message.reply_text("⚠️ 调度器已经在运行中")
return
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请先等待任务完成或使用 /stop 停止"
)
return
# 更新 config
cfg.SCHEDULER_ENABLED = True
# 检查是否在时间窗口内
now = datetime.now()
if cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR:
# 在时间窗口内,立即启动
self._start_scheduler()
await update.message.reply_text(
f"<b>✅ 调度器已启动</b>\n\n"
f"当前在时间窗口内,立即开始\n"
f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟",
parse_mode="HTML"
)
else:
# 不在时间窗口内,注册 daily job
self._register_scheduler_daily_job()
await update.message.reply_text(
f"<b>✅ 调度器已启用</b>\n\n"
f"当前不在时间窗口内\n"
f"将在每天 {cfg.SCHEDULER_START_HOUR:02d}:00 自动启动\n"
f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00",
parse_mode="HTML"
)
elif action == "off":
cfg.SCHEDULER_ENABLED = False
if self._scheduler_active:
self._scheduler_stop_event.set()
await update.message.reply_text(
"<b>🛑 调度器正在停止</b>\n\n"
"当前轮次完成后将停止调度器\n"
"如需立即停止任务,请使用 /stop",
parse_mode="HTML"
)
else:
# 移除 daily job
self._remove_scheduler_daily_job()
await update.message.reply_text(
"<b>❌ 调度器已关闭</b>",
parse_mode="HTML"
)
else:
await update.message.reply_text(
"用法: <code>/schedule on</code> 或 <code>/schedule off</code>",
parse_mode="HTML"
)
@admin_only
async def cmd_schedule_config(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""配置调度器参数"""
import config as cfg
import tomli_w
args = context.args
if not args:
text = (
f"<b>⚙️ 调度器配置</b>\n\n"
f"<code>start_hour</code> = {cfg.SCHEDULER_START_HOUR} (开始时间)\n"
f"<code>end_hour</code> = {cfg.SCHEDULER_END_HOUR} (结束时间)\n"
f"<code>batch_size</code> = {cfg.SCHEDULER_BATCH_SIZE} (每轮注册数)\n"
f"<code>cooldown</code> = {cfg.SCHEDULER_COOLDOWN_MINUTES} (冷却分钟)\n"
f"<code>max_failures</code> = {cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES} (连续失败上限)\n\n"
f"修改示例:\n"
f" <code>/schedule_config batch_size 30</code>\n"
f" <code>/schedule_config start_hour 9</code>\n"
f" <code>/schedule_config cooldown 10</code>"
)
await update.message.reply_text(text, parse_mode="HTML")
return
if len(args) < 2:
await update.message.reply_text("❌ 用法: <code>/schedule_config 参数名 值</code>", parse_mode="HTML")
return
key = args[0].lower()
try:
value = int(args[1])
except ValueError:
await update.message.reply_text("❌ 值必须为整数")
return
# 参数映射
config_map = {
"start_hour": ("start_hour", 0, 23),
"end_hour": ("end_hour", 0, 23),
"batch_size": ("batch_size", 1, 200),
"cooldown": ("cooldown_minutes", 1, 60),
"max_failures": ("max_consecutive_failures", 1, 20),
}
if key not in config_map:
valid_keys = ", ".join(config_map.keys())
await update.message.reply_text(f"❌ 无效参数。可选: {valid_keys}")
return
toml_key, min_val, max_val = config_map[key]
if not (min_val <= value <= max_val):
await update.message.reply_text(f"❌ 值必须在 {min_val}-{max_val} 之间")
return
# 更新内存中的配置
attr_name = f"SCHEDULER_{toml_key.upper()}"
old_value = getattr(cfg, attr_name)
setattr(cfg, attr_name, value)
# 持久化到 config.toml
try:
import tomllib
with open(CONFIG_FILE, "rb") as f:
config = tomllib.load(f)
if "scheduler" not in config:
config["scheduler"] = {}
config["scheduler"][toml_key] = value
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
await update.message.reply_text(
f"<b>✅ 配置已更新</b>\n\n"
f"<code>{key}</code>: {old_value}{value}\n\n"
f"💡 如调度器正在运行,下轮生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
f"⚠️ 内存配置已更新 ({key}: {old_value}{value})\n"
"但无法持久化: 缺少 tomli_w 依赖"
)
except Exception as e:
await update.message.reply_text(
f"⚠️ 内存配置已更新 ({key}: {old_value}{value})\n"
f"但持久化失败: {e}"
)
@admin_only
async def cmd_schedule_status(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看调度器运行状态"""
import config as cfg
if not self._scheduler_active:
status = "⏸️ 已启用 (等待触发)" if cfg.SCHEDULER_ENABLED else "❌ 未启用"
await update.message.reply_text(
f"<b>📊 调度器状态</b>\n\n"
f"状态: {status}\n\n"
f"使用 <code>/schedule on</code> 启动",
parse_mode="HTML"
)
return
stats = self._scheduler_stats
elapsed = ""
if stats["start_time"]:
delta = datetime.now() - stats["start_time"]
hours, remainder = divmod(int(delta.total_seconds()), 3600)
minutes, seconds = divmod(remainder, 60)
elapsed = f"{hours}h{minutes}m{seconds}s"
text = (
f"<b>📊 调度器运行状态</b>\n\n"
f"🟢 运行中 | 第 {self._scheduler_round}\n"
f"⏱️ 已运行: {elapsed}\n"
f"⏰ 窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n\n"
f"<b>累计统计:</b>\n"
f" 📝 注册: {stats['total_registered']}\n"
f" 📥 入库: {stats['total_ingested']}\n"
f" ❌ 失败轮次: {stats['total_failed']}\n"
f" 🔄 总轮次: {stats['total_rounds']}\n"
f" ⚠️ 连续失败: {stats['consecutive_failures']}/{cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES}"
)
if self.current_team:
text += f"\n\n🏃 当前任务: {self.current_team}"
await update.message.reply_text(text, parse_mode="HTML")
def _register_scheduler_daily_job(self):
"""注册每日定时触发 Job"""
import config as cfg
# 先移除旧的 job
self._remove_scheduler_daily_job()
if self.app and self.app.job_queue:
trigger_time = dt_time(hour=cfg.SCHEDULER_START_HOUR, minute=0, second=0, tzinfo=BEIJING_TZ)
self.app.job_queue.run_daily(
self._scheduler_daily_trigger,
time=trigger_time,
name="scheduler_daily"
)
log.info(f"Scheduler daily job registered at {cfg.SCHEDULER_START_HOUR:02d}:00 (Beijing Time)")
def _remove_scheduler_daily_job(self):
"""移除每日定时触发 Job"""
if self.app and self.app.job_queue:
jobs = self.app.job_queue.get_jobs_by_name("scheduler_daily")
for job in jobs:
job.schedule_removal()
async def _scheduler_daily_trigger(self, context: ContextTypes.DEFAULT_TYPE):
"""每日定时触发回调 - 由 job_queue 调用"""
import config as cfg
if not cfg.SCHEDULER_ENABLED:
return
# 检查是否被 /stop 挂起了今天的调度
today = datetime.now().date()
if self._scheduler_suspended_date == today:
log.info(f"Scheduler suspended for today ({today}), skipping")
return
if self._scheduler_active:
log.info("Scheduler already active, skipping daily trigger")
return
if self.current_task and not self.current_task.done():
log.warning("Task already running, skipping scheduler trigger")
# 通知管理员
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await context.bot.send_message(
chat_id,
f"⚠️ <b>调度器触发跳过</b>\n\n"
f"当前有任务在运行: {self.current_team}\n"
f"请任务完成后手动使用 /schedule on 启动",
parse_mode="HTML"
)
except Exception:
pass
return
# 启动调度器
self._start_scheduler()
# 通知管理员
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await context.bot.send_message(
chat_id,
f"<b>⏰ 定时调度器已启动</b>\n\n"
f"时间窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
f"每轮: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 → run_all → 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟",
parse_mode="HTML"
)
except Exception:
pass
def _start_scheduler(self):
"""启动调度器循环"""
self._scheduler_active = True
self._scheduler_stop_event.clear()
self._scheduler_round = 0
self._scheduler_stats = {
"total_rounds": 0,
"total_registered": 0,
"total_ingested": 0,
"total_failed": 0,
"consecutive_failures": 0,
"start_time": datetime.now(),
}
self._scheduler_task = asyncio.create_task(self._scheduler_loop())
async def _scheduler_loop(self):
"""调度器主循环: 注册 → run_all → 冷却 → 重复"""
import config as cfg
log.section("定时调度器启动")
chat_id = TELEGRAM_ADMIN_CHAT_IDS[0] if TELEGRAM_ADMIN_CHAT_IDS else None
try:
while not self._scheduler_stop_event.is_set():
# ===== 时间窗口检查 =====
now = datetime.now()
if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR):
log.info(f"超出时间窗口 ({cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00),调度结束")
break
self._scheduler_round += 1
round_num = self._scheduler_round
log.section(f"调度器 - 第 {round_num}")
# ===== Phase 1: Provisioning (注册 GPT Team 账号) =====
log.info(f"[Phase 1] 注册 {cfg.SCHEDULER_BATCH_SIZE} 个 GPT Team 账号...")
if chat_id:
try:
await self.app.bot.send_message(
chat_id,
f"<b>🔄 第 {round_num} 轮开始</b>\n\n"
f"📝 Phase 1: 注册 {cfg.SCHEDULER_BATCH_SIZE} 个账号...\n"
f"{now.strftime('%H:%M:%S')}",
parse_mode="HTML"
)
except Exception:
pass
# 重置停止标志
try:
import run
run._shutdown_requested = False
except Exception:
pass
self.current_team = f"调度器-第{round_num}轮-注册"
# 执行注册
reg_success = 0
reg_fail = 0
try:
reg_success, reg_fail = await self._scheduler_run_registration(
chat_id, cfg.SCHEDULER_BATCH_SIZE, cfg.SCHEDULER_OUTPUT_TYPE
)
except Exception as e:
log.error(f"调度器注册阶段异常: {e}")
reg_fail = cfg.SCHEDULER_BATCH_SIZE
self._scheduler_stats["total_registered"] += reg_success
# 检查中断
if self._scheduler_stop_event.is_set():
break
# 注册全部失败检查
if reg_success == 0:
self._scheduler_stats["consecutive_failures"] += 1
self._scheduler_stats["total_failed"] += 1
log.warning(f"{round_num} 轮注册全部失败 (连续失败: {self._scheduler_stats['consecutive_failures']})")
if self._scheduler_stats["consecutive_failures"] >= cfg.SCHEDULER_MAX_CONSECUTIVE_FAILURES:
log.error(f"连续失败 {self._scheduler_stats['consecutive_failures']} 轮,调度器自动暂停")
if chat_id:
try:
await self.app.bot.send_message(
chat_id,
f"<b>🚨 调度器自动暂停</b>\n\n"
f"连续 {self._scheduler_stats['consecutive_failures']} 轮注册全部失败\n"
f"请检查配置后使用 /schedule on 重新启动\n\n"
f"可能原因:\n"
f"• IBAN 已用完\n"
f"• 邮件 API 异常\n"
f"• 域名全部拉黑",
parse_mode="HTML"
)
except Exception:
pass
break
continue # 跳过 run_all直接下一轮
else:
self._scheduler_stats["consecutive_failures"] = 0
# ===== Phase 1.5: Verify (验证 account_id) =====
log.info(f"[Phase 1.5] 验证注册账号 account_id...")
if chat_id:
try:
await self.app.bot.send_message(
chat_id,
f"<b>🔍 第 {round_num} 轮 - Phase 1.5</b>\n\n"
f"📝 注册完成: ✅ {reg_success} / ❌ {reg_fail}\n"
f"🔍 正在验证 account_id...",
parse_mode="HTML"
)
except Exception:
pass
self.current_team = f"调度器-第{round_num}轮-验证"
try:
await self._validate_and_cleanup_accounts(chat_id, force_all=True)
log.success("verify_all 完成")
except Exception as e:
log.error(f"调度器验证阶段异常: {e}")
# 检查中断
if self._scheduler_stop_event.is_set():
break
# ===== Phase 2: Ingestion (run_all) =====
log.info(f"[Phase 2] 执行 run_all 入库处理...")
if chat_id:
try:
await self.app.bot.send_message(
chat_id,
f"<b>🚀 第 {round_num} 轮 - Phase 2</b>\n\n"
f"📝 注册: ✅ {reg_success} / ❌ {reg_fail}\n"
f"🔍 验证完成\n"
f"📥 开始 run_all 入库处理...",
parse_mode="HTML"
)
except Exception:
pass
self.current_team = f"调度器-第{round_num}轮-run_all"
ingested = 0
try:
ingested = await self._scheduler_run_all()
except Exception as e:
log.error(f"调度器 run_all 阶段异常: {e}")
self._scheduler_stats["total_ingested"] += ingested
self._scheduler_stats["total_rounds"] += 1
# 检查中断
if self._scheduler_stop_event.is_set():
break
# ===== Graceful Boundary: 任务完成后检查时间 =====
now = datetime.now()
if not (cfg.SCHEDULER_START_HOUR <= now.hour < cfg.SCHEDULER_END_HOUR):
log.info(f"run_all 完成后已超出时间窗口 ({now.strftime('%H:%M:%S')}),调度结束")
if chat_id:
try:
await self.app.bot.send_message(
chat_id,
f"<b>⏰ 第 {round_num} 轮完成后已超出时间窗口</b>\n\n"
f"当前: {now.strftime('%H:%M:%S')}\n"
f"窗口: {cfg.SCHEDULER_START_HOUR:02d}:00 - {cfg.SCHEDULER_END_HOUR:02d}:00\n"
f"本日调度结束",
parse_mode="HTML"
)
except Exception:
pass
break
# ===== Phase 3: Cooldown =====
cooldown = cfg.SCHEDULER_COOLDOWN_MINUTES * 60
log.info(f"[Phase 3] 冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...")
if chat_id:
try:
next_time = now + timedelta(seconds=cooldown)
await self.app.bot.send_message(
chat_id,
f"<b>⏳ 第 {round_num} 轮完成</b>\n\n"
f"✅ 注册: {reg_success} | 📥 入库: {ingested}\n"
f"冷却 {cfg.SCHEDULER_COOLDOWN_MINUTES} 分钟...\n"
f"下轮预计: {next_time.strftime('%H:%M:%S')}",
parse_mode="HTML"
)
except Exception:
pass
self.current_team = f"调度器-冷却中"
# 可中断的等待
try:
await asyncio.wait_for(
self._scheduler_stop_event.wait(),
timeout=cooldown
)
# 如果 wait 返回,说明收到了停止信号
break
except asyncio.TimeoutError:
# 正常超时,继续下一轮
pass
except Exception as e:
log.error(f"调度器异常退出: {e}")
finally:
self._scheduler_active = False
self.current_task = None
self.current_team = None
log.info("调度器已停止")
# 发送日报
await self._send_daily_report()
async def _scheduler_run_registration(self, chat_id: int, count: int, output_type: str) -> tuple:
"""调度器: 执行注册阶段,返回 (success_count, fail_count)"""
from auto_gpt_team import run_single_registration_auto, cleanup_chrome_processes, get_register_mode
from config import CONCURRENT_ENABLED, CONCURRENT_WORKERS
import json
import threading
results = []
success_count = 0
fail_count = 0
results_lock = threading.Lock()
current_mode = get_register_mode()
workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
workers = min(workers, count)
task_queue = list(range(count))
queue_lock = threading.Lock()
def worker_task(worker_id: int):
nonlocal success_count, fail_count
while True:
try:
import run
if run._shutdown_requested:
break
except Exception:
pass
with queue_lock:
if not task_queue:
break
task_idx = task_queue.pop(0)
try:
result = run_single_registration_auto(
progress_callback=None,
step_callback=None
)
with results_lock:
if result.get("stopped"):
break
elif result.get("success"):
success_count += 1
results.append({
"account": result["account"],
"password": result["password"],
"token": result["token"],
"account_id": result.get("account_id", "")
})
else:
fail_count += 1
except Exception as e:
with results_lock:
fail_count += 1
log.error(f"Scheduler Worker {worker_id}: 注册异常: {e}")
cleanup_chrome_processes()
# 使用线程池并发执行
import concurrent.futures
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
async_futures = [loop.run_in_executor(executor, worker_task, i) for i in range(workers)]
await asyncio.gather(*async_futures, return_exceptions=True)
# 保存结果到 team.json
if results and output_type == "team":
try:
team_file = TEAM_JSON_FILE
existing = []
if team_file.exists():
with open(team_file, "r", encoding="utf-8") as f:
existing = json.load(f)
existing.extend(results)
with open(team_file, "w", encoding="utf-8") as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
reload_config()
log.success(f"调度器: {success_count} 个账号已写入 team.json (总计 {len(existing)})")
except Exception as e:
log.error(f"调度器: 保存 team.json 失败: {e}")
return (success_count, fail_count)
async def _scheduler_run_all(self) -> int:
"""调度器: 执行 run_all返回成功入库数"""
from run import run_all_teams
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
loop = asyncio.get_event_loop()
def _task():
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_all_teams()
results = await loop.run_in_executor(self.executor, _task)
# 自动清理 team.json 和 team_tracker.json
await self._auto_clean_after_run_all()
# 统计成功入库数
ingested = sum(1 for r in (results or []) if r.get("status") in ("success", "completed"))
return ingested
async def _send_daily_report(self):
"""发送调度器日报"""
stats = self._scheduler_stats
if stats["total_rounds"] == 0 and stats["total_registered"] == 0:
return # 没有任何数据,不发送
elapsed = ""
if stats["start_time"]:
delta = datetime.now() - stats["start_time"]
hours, remainder = divmod(int(delta.total_seconds()), 3600)
minutes, _ = divmod(remainder, 60)
elapsed = f"{hours}h{minutes}m"
text = (
f"<b>📊 调度器日报</b>\n\n"
f"⏱️ 运行时长: {elapsed}\n"
f"🔄 总轮次: {stats['total_rounds']}\n\n"
f"<b>累计结果:</b>\n"
f" 📝 注册成功: {stats['total_registered']}\n"
f" 📥 入库成功: {stats['total_ingested']}\n"
f" ❌ 失败轮次: {stats['total_failed']}\n"
)
if stats["consecutive_failures"] >= 1:
text += f"\n⚠️ 最终连续失败: {stats['consecutive_failures']}"
for chat_id in TELEGRAM_ADMIN_CHAT_IDS:
try:
await self.app.bot.send_message(
chat_id,
text,
parse_mode="HTML"
)
except Exception:
pass
async def main():
"""主函数"""