168 lines
3.4 KiB
Go
168 lines
3.4 KiB
Go
package telegram
|
|
|
|
import (
|
|
"context"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"proxyrotator/internal/config"
|
|
"proxyrotator/internal/model"
|
|
"proxyrotator/internal/store"
|
|
"proxyrotator/internal/tester"
|
|
)
|
|
|
|
// Scheduler 定时测活调度器
|
|
type Scheduler struct {
|
|
mu sync.Mutex
|
|
store store.ProxyStore
|
|
notifier *Notifier
|
|
tester *tester.HTTPTester
|
|
cfg *config.Config
|
|
|
|
ticker *time.Ticker
|
|
stopChan chan struct{}
|
|
running bool
|
|
}
|
|
|
|
// NewScheduler 创建调度器
|
|
func NewScheduler(store store.ProxyStore, notifier *Notifier, cfg *config.Config) *Scheduler {
|
|
return &Scheduler{
|
|
store: store,
|
|
notifier: notifier,
|
|
tester: tester.NewHTTPTester(),
|
|
cfg: cfg,
|
|
stopChan: make(chan struct{}),
|
|
}
|
|
}
|
|
|
|
// Start 启动调度器
|
|
func (s *Scheduler) Start() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.running {
|
|
return
|
|
}
|
|
|
|
interval := time.Duration(s.cfg.TelegramTestIntervalMin) * time.Minute
|
|
if interval < 5*time.Minute {
|
|
interval = 5 * time.Minute
|
|
}
|
|
|
|
s.ticker = time.NewTicker(interval)
|
|
s.stopChan = make(chan struct{})
|
|
s.running = true
|
|
|
|
go s.loop()
|
|
slog.Info("telegram scheduler started", "interval", interval)
|
|
}
|
|
|
|
// Stop 停止调度器
|
|
func (s *Scheduler) Stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if !s.running {
|
|
return
|
|
}
|
|
|
|
if s.ticker != nil {
|
|
s.ticker.Stop()
|
|
}
|
|
close(s.stopChan)
|
|
s.running = false
|
|
slog.Info("telegram scheduler stopped")
|
|
}
|
|
|
|
// loop 调度循环
|
|
func (s *Scheduler) loop() {
|
|
for {
|
|
select {
|
|
case <-s.stopChan:
|
|
return
|
|
case <-s.ticker.C:
|
|
if err := s.RunTest(context.Background()); err != nil {
|
|
slog.Error("scheduled test failed", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// RunTest 执行测活(所有分组)
|
|
func (s *Scheduler) RunTest(ctx context.Context) error {
|
|
return s.RunTestWithGroup(ctx, "")
|
|
}
|
|
|
|
// RunTestWithGroup 执行测活(指定分组)
|
|
func (s *Scheduler) RunTestWithGroup(ctx context.Context, group string) error {
|
|
slog.Info("running scheduled proxy test", "group", group)
|
|
|
|
// 获取待测试代理
|
|
query := model.ProxyQuery{
|
|
Group: group,
|
|
StatusIn: []model.ProxyStatus{model.StatusUnknown, model.StatusAlive},
|
|
OnlyEnabled: true,
|
|
Limit: 1000,
|
|
}
|
|
|
|
proxies, err := s.store.List(ctx, query)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(proxies) == 0 {
|
|
slog.Info("no proxies to test")
|
|
return nil
|
|
}
|
|
|
|
// 构建测试规格
|
|
spec := model.TestSpec{
|
|
URL: s.cfg.TelegramTestURL,
|
|
Method: "GET",
|
|
Timeout: time.Duration(s.cfg.TelegramTestTimeoutMs) * time.Millisecond,
|
|
}
|
|
|
|
// 执行测试
|
|
results := s.tester.TestBatch(ctx, proxies, spec, 50)
|
|
|
|
// 统计结果
|
|
alive, dead := 0, 0
|
|
for _, r := range results {
|
|
now := r.CheckedAt
|
|
if r.OK {
|
|
alive++
|
|
status := model.StatusAlive
|
|
_ = s.store.UpdateHealth(ctx, r.ProxyID, model.HealthPatch{
|
|
Status: &status,
|
|
ScoreDelta: 1,
|
|
SuccessInc: 1,
|
|
LatencyMs: &r.LatencyMs,
|
|
CheckedAt: &now,
|
|
})
|
|
} else {
|
|
dead++
|
|
status := model.StatusDead
|
|
_ = s.store.UpdateHealth(ctx, r.ProxyID, model.HealthPatch{
|
|
Status: &status,
|
|
ScoreDelta: -3,
|
|
FailInc: 1,
|
|
CheckedAt: &now,
|
|
})
|
|
}
|
|
}
|
|
|
|
slog.Info("scheduled test completed", "tested", len(results), "alive", alive, "dead", dead)
|
|
|
|
// 检查是否需要告警
|
|
total := len(results)
|
|
if total > 0 {
|
|
alivePercent := float64(alive) / float64(total) * 100
|
|
if alivePercent < float64(s.cfg.TelegramAlertThreshold) {
|
|
s.notifier.SendAlert(ctx, alive, dead, total, alivePercent)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|