package api import ( "encoding/json" "fmt" "io" "net/http" "strconv" "sync" "time" "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/logger" ) var ( cleanerRunning bool cleanerMu sync.Mutex cleanerStopChan chan struct{} lastCleanTime time.Time ) // StartErrorCleanerService 启动定期清理错误账号服务 func StartErrorCleanerService() { cleanerMu.Lock() if cleanerRunning { cleanerMu.Unlock() return } cleanerRunning = true cleanerStopChan = make(chan struct{}) cleanerMu.Unlock() logger.Info("错误账号清理服务已启动(需在配置中启用)", "", "cleaner") go func() { for { // 读取清理间隔配置 (默认 3600 秒 = 1 小时) cleanInterval := 3600 if database.Instance != nil { if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" { if v, err := strconv.Atoi(val); err == nil && v >= 60 { cleanInterval = v } } } select { case <-cleanerStopChan: logger.Info("错误账号清理服务已停止", "", "cleaner") return case <-time.After(time.Duration(cleanInterval) * time.Second): checkAndCleanErrors() } } }() } // StopErrorCleanerService 停止错误账号清理服务 func StopErrorCleanerService() { cleanerMu.Lock() defer cleanerMu.Unlock() if cleanerRunning && cleanerStopChan != nil { close(cleanerStopChan) cleanerRunning = false } } // getTotalAccountCount 获取 S2A 总账号数 func getTotalAccountCount() (int, error) { if config.Global == nil || config.Global.S2AApiBase == "" { return 0, fmt.Errorf("S2A 配置未设置") } client := &http.Client{Timeout: 30 * time.Second} url := fmt.Sprintf("%s/api/v1/admin/dashboard", config.Global.S2AApiBase) req, err := http.NewRequest("GET", url, nil) if err != nil { return 0, err } setS2AHeaders(req) resp, err := client.Do(req) if err != nil { return 0, err } defer resp.Body.Close() body, _ := io.ReadAll(resp.Body) var result struct { Code int `json:"code"` Data struct { TotalAccounts int `json:"total_accounts"` } `json:"data"` } if err := json.Unmarshal(body, &result); err != nil { return 0, err } return result.Data.TotalAccounts, nil } // triggerAutoRefill 触发自动补号 func triggerAutoRefill(count int) { if database.Instance == nil { return } // 检查是否有待处理的母号 pendingOwners, err := database.Instance.GetPendingOwners() if err != nil || len(pendingOwners) == 0 { logger.Warning("自动补号跳过: 无可用母号", "", "cleaner") return } // 每个母号产生 4 个账号(默认),计算需要多少母号 membersPerTeam := 4 if val, _ := database.Instance.GetConfig("monitor_members_per_team"); val != "" { if v, err := strconv.Atoi(val); err == nil && v > 0 { membersPerTeam = v } } // 计算需要处理的母号数量 needOwners := (count + membersPerTeam - 1) / membersPerTeam if needOwners > len(pendingOwners) { needOwners = len(pendingOwners) } if needOwners < 1 { needOwners = 1 } // 读取补号配置 concurrentTeams := 2 if val, _ := database.Instance.GetConfig("monitor_concurrent_teams"); val != "" { if v, err := strconv.Atoi(val); err == nil && v > 0 { concurrentTeams = v } } s2aConcurrency := 2 if val, _ := database.Instance.GetConfig("monitor_s2a_concurrency"); val != "" { if v, err := strconv.Atoi(val); err == nil && v > 0 { s2aConcurrency = v } } browserType := "chromedp" if val, _ := database.Instance.GetConfig("monitor_browser_type"); val != "" { browserType = val } useProxy := false if val, _ := database.Instance.GetConfig("monitor_replenish_use_proxy"); val == "true" { useProxy = true } proxy := "" if useProxy { proxy = config.Global.GetProxy() } logger.Info(fmt.Sprintf("自动补号启动: 处理 %d 个母号,预计补充 %d 个账号", needOwners, needOwners*membersPerTeam), "", "cleaner") // 构建请求 owners := make([]TeamOwner, 0, needOwners) for i := 0; i < needOwners && i < len(pendingOwners); i++ { owners = append(owners, TeamOwner{ Email: pendingOwners[i].Email, Password: pendingOwners[i].Password, Token: pendingOwners[i].Token, AccountID: pendingOwners[i].AccountID, }) } req := TeamProcessRequest{ Owners: owners, MembersPerTeam: membersPerTeam, ConcurrentTeams: concurrentTeams, ConcurrentS2A: s2aConcurrency, BrowserType: browserType, Headless: true, Proxy: proxy, IncludeOwner: false, ProcessCount: needOwners, } // 异步启动处理(避免阻塞清理服务) go runTeamProcess(req) } // checkAndCleanErrors 检查配置并清理错误账号 func checkAndCleanErrors() { if database.Instance == nil { return } // 检查是否启用了自动清理 enabled := false if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" { enabled = true } if !enabled { return } // 获取清理前的总账号数 totalBefore, err := getTotalAccountCount() if err != nil { logger.Warning(fmt.Sprintf("获取总账号数失败: %v", err), "", "cleaner") totalBefore = 0 } // 获取错误账号列表 errorAccounts, err := fetchAllErrorAccounts() if err != nil { logger.Error(fmt.Sprintf("获取错误账号列表失败: %v", err), "", "cleaner") return } if len(errorAccounts) == 0 { logger.Info("无错误账号需要清理", "", "cleaner") lastCleanTime = time.Now() return } // 执行清理 logger.Status(fmt.Sprintf("定期清理错误账号中: 共 %d 个", len(errorAccounts)), "", "cleaner") success := 0 failed := 0 for _, account := range errorAccounts { err := deleteS2AAccount(account.ID) if err != nil { failed++ logger.Warning(fmt.Sprintf("删除账号失败: ID=%d, Email=%s, Error=%v", account.ID, account.Email, err), account.Email, "cleaner") } else { success++ } } lastCleanTime = time.Now() logger.Success(fmt.Sprintf("定期清理错误账号完成: 成功=%d, 失败=%d, 总数=%d", success, failed, len(errorAccounts)), "", "cleaner") // 检查是否需要触发自动补号 autoRefillEnabled := false if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" { autoRefillEnabled = true } if autoRefillEnabled && totalBefore > 0 && success > 0 { // 阈值默认 25% threshold := 25 if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" { if v, err := strconv.Atoi(val); err == nil && v > 0 && v <= 100 { threshold = v } } // 计算是否达到阈值 thresholdCount := totalBefore * threshold / 100 if thresholdCount < 1 { thresholdCount = 1 } if success >= thresholdCount { logger.Info(fmt.Sprintf("清理触发自动补号: 已删除 %d 个 (总数 %d,阈值 %d%%),将补充 %d 个账号", success, totalBefore, threshold, success), "", "cleaner") triggerAutoRefill(success) } } } // GetCleanerStatus 获取清理服务状态 func GetCleanerStatus() map[string]interface{} { cleanerMu.Lock() defer cleanerMu.Unlock() enabled := false interval := 3600 autoRefill := false autoRefillThreshold := 25 if database.Instance != nil { if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" { enabled = true } if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" { if v, err := strconv.Atoi(val); err == nil { interval = v } } if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" { autoRefill = true } if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" { if v, err := strconv.Atoi(val); err == nil { autoRefillThreshold = v } } } return map[string]interface{}{ "running": cleanerRunning, "enabled": enabled, "interval": interval, "last_clean_time": lastCleanTime.Format(time.RFC3339), "auto_refill": autoRefill, "auto_refill_threshold": autoRefillThreshold, } }