package telegram import ( "context" "log/slog" "sync" "time" "proxyrotator/internal/config" "proxyrotator/internal/model" "proxyrotator/internal/store" "proxyrotator/internal/tester" ) // Scheduler 定时测活调度器 type Scheduler struct { mu sync.Mutex store store.ProxyStore notifier *Notifier tester *tester.HTTPTester cfg *config.Config ticker *time.Ticker stopChan chan struct{} running bool } // NewScheduler 创建调度器 func NewScheduler(store store.ProxyStore, notifier *Notifier, cfg *config.Config) *Scheduler { return &Scheduler{ store: store, notifier: notifier, tester: tester.NewHTTPTester(), cfg: cfg, stopChan: make(chan struct{}), } } // Start 启动调度器 func (s *Scheduler) Start() { s.mu.Lock() defer s.mu.Unlock() if s.running { return } interval := time.Duration(s.cfg.TelegramTestIntervalMin) * time.Minute if interval < 5*time.Minute { interval = 5 * time.Minute } s.ticker = time.NewTicker(interval) s.stopChan = make(chan struct{}) s.running = true go s.loop() slog.Info("telegram scheduler started", "interval", interval) } // Stop 停止调度器 func (s *Scheduler) Stop() { s.mu.Lock() defer s.mu.Unlock() if !s.running { return } if s.ticker != nil { s.ticker.Stop() } close(s.stopChan) s.running = false slog.Info("telegram scheduler stopped") } // loop 调度循环 func (s *Scheduler) loop() { for { select { case <-s.stopChan: return case <-s.ticker.C: if err := s.RunTest(context.Background()); err != nil { slog.Error("scheduled test failed", "error", err) } } } } // RunTest 执行测活(所有分组) func (s *Scheduler) RunTest(ctx context.Context) error { return s.RunTestWithGroup(ctx, "") } // RunTestWithGroup 执行测活(指定分组) func (s *Scheduler) RunTestWithGroup(ctx context.Context, group string) error { slog.Info("running scheduled proxy test", "group", group) // 获取待测试代理 query := model.ProxyQuery{ Group: group, StatusIn: []model.ProxyStatus{model.StatusUnknown, model.StatusAlive}, OnlyEnabled: true, Limit: 1000, } proxies, err := s.store.List(ctx, query) if err != nil { return err } if len(proxies) == 0 { slog.Info("no proxies to test") return nil } // 构建测试规格 spec := model.TestSpec{ URL: s.cfg.TelegramTestURL, Method: "GET", Timeout: time.Duration(s.cfg.TelegramTestTimeoutMs) * time.Millisecond, } // 执行测试 results := s.tester.TestBatch(ctx, proxies, spec, 50) // 统计结果 alive, dead := 0, 0 for _, r := range results { now := r.CheckedAt if r.OK { alive++ status := model.StatusAlive _ = s.store.UpdateHealth(ctx, r.ProxyID, model.HealthPatch{ Status: &status, ScoreDelta: 1, SuccessInc: 1, LatencyMs: &r.LatencyMs, CheckedAt: &now, }) } else { dead++ status := model.StatusDead _ = s.store.UpdateHealth(ctx, r.ProxyID, model.HealthPatch{ Status: &status, ScoreDelta: -3, FailInc: 1, CheckedAt: &now, }) } } slog.Info("scheduled test completed", "tested", len(results), "alive", alive, "dead", dead) // 检查是否需要告警 total := len(results) if total > 0 { alivePercent := float64(alive) / float64(total) * 100 if alivePercent < float64(s.cfg.TelegramAlertThreshold) { s.notifier.SendAlert(ctx, alive, dead, total, alivePercent) } } return nil }