Files
codexautopool/backend/internal/api/error_cleaner.go

314 lines
7.9 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"sync"
"time"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/logger"
)
var (
cleanerRunning bool
cleanerMu sync.Mutex
cleanerStopChan chan struct{}
lastCleanTime time.Time
)
// StartErrorCleanerService 启动定期清理错误账号服务
func StartErrorCleanerService() {
cleanerMu.Lock()
if cleanerRunning {
cleanerMu.Unlock()
return
}
cleanerRunning = true
cleanerStopChan = make(chan struct{})
cleanerMu.Unlock()
logger.Info("错误账号清理服务已启动(需在配置中启用)", "", "cleaner")
go func() {
for {
// 读取清理间隔配置 (默认 3600 秒 = 1 小时)
cleanInterval := 3600
if database.Instance != nil {
if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 60 {
cleanInterval = v
}
}
}
select {
case <-cleanerStopChan:
logger.Info("错误账号清理服务已停止", "", "cleaner")
return
case <-time.After(time.Duration(cleanInterval) * time.Second):
checkAndCleanErrors()
}
}
}()
}
// StopErrorCleanerService 停止错误账号清理服务
func StopErrorCleanerService() {
cleanerMu.Lock()
defer cleanerMu.Unlock()
if cleanerRunning && cleanerStopChan != nil {
close(cleanerStopChan)
cleanerRunning = false
}
}
// getTotalAccountCount 获取 S2A 总账号数
func getTotalAccountCount() (int, error) {
if config.Global == nil || config.Global.S2AApiBase == "" {
return 0, fmt.Errorf("S2A 配置未设置")
}
client := &http.Client{Timeout: 30 * time.Second}
url := fmt.Sprintf("%s/api/v1/admin/dashboard", config.Global.S2AApiBase)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
setS2AHeaders(req)
resp, err := client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
var result struct {
Code int `json:"code"`
Data struct {
TotalAccounts int `json:"total_accounts"`
} `json:"data"`
}
if err := json.Unmarshal(body, &result); err != nil {
return 0, err
}
return result.Data.TotalAccounts, nil
}
// triggerAutoRefill 触发自动补号
func triggerAutoRefill(count int) {
if database.Instance == nil {
return
}
// 检查是否有待处理的母号
pendingOwners, err := database.Instance.GetPendingOwners()
if err != nil || len(pendingOwners) == 0 {
logger.Warning("自动补号跳过: 无可用母号", "", "cleaner")
return
}
// 每个母号产生 4 个账号(默认),计算需要多少母号
membersPerTeam := 4
if val, _ := database.Instance.GetConfig("monitor_members_per_team"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v > 0 {
membersPerTeam = v
}
}
// 计算需要处理的母号数量
needOwners := (count + membersPerTeam - 1) / membersPerTeam
if needOwners > len(pendingOwners) {
needOwners = len(pendingOwners)
}
if needOwners < 1 {
needOwners = 1
}
// 读取补号配置
concurrentTeams := 2
if val, _ := database.Instance.GetConfig("monitor_concurrent_teams"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v > 0 {
concurrentTeams = v
}
}
s2aConcurrency := 2
if val, _ := database.Instance.GetConfig("monitor_s2a_concurrency"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v > 0 {
s2aConcurrency = v
}
}
browserType := "chromedp"
if val, _ := database.Instance.GetConfig("monitor_browser_type"); val != "" {
browserType = val
}
useProxy := false
if val, _ := database.Instance.GetConfig("monitor_replenish_use_proxy"); val == "true" {
useProxy = true
}
proxy := ""
if useProxy {
proxy = config.Global.GetProxy()
}
logger.Info(fmt.Sprintf("自动补号启动: 处理 %d 个母号,预计补充 %d 个账号", needOwners, needOwners*membersPerTeam), "", "cleaner")
// 构建请求
owners := make([]TeamOwner, 0, needOwners)
for i := 0; i < needOwners && i < len(pendingOwners); i++ {
owners = append(owners, TeamOwner{
Email: pendingOwners[i].Email,
Password: pendingOwners[i].Password,
Token: pendingOwners[i].Token,
AccountID: pendingOwners[i].AccountID,
})
}
req := TeamProcessRequest{
Owners: owners,
MembersPerTeam: membersPerTeam,
ConcurrentTeams: concurrentTeams,
ConcurrentS2A: s2aConcurrency,
BrowserType: browserType,
Headless: true,
Proxy: proxy,
IncludeOwner: false,
ProcessCount: needOwners,
}
// 异步启动处理(避免阻塞清理服务)
go runTeamProcess(req)
}
// checkAndCleanErrors 检查配置并清理错误账号
func checkAndCleanErrors() {
if database.Instance == nil {
return
}
// 检查是否启用了自动清理
enabled := false
if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" {
enabled = true
}
if !enabled {
return
}
// 获取清理前的总账号数
totalBefore, err := getTotalAccountCount()
if err != nil {
logger.Warning(fmt.Sprintf("获取总账号数失败: %v", err), "", "cleaner")
totalBefore = 0
}
// 获取错误账号列表
errorAccounts, err := fetchAllErrorAccounts()
if err != nil {
logger.Error(fmt.Sprintf("获取错误账号列表失败: %v", err), "", "cleaner")
return
}
if len(errorAccounts) == 0 {
logger.Info("无错误账号需要清理", "", "cleaner")
lastCleanTime = time.Now()
return
}
// 执行清理
logger.Status(fmt.Sprintf("定期清理错误账号中: 共 %d 个", len(errorAccounts)), "", "cleaner")
success := 0
failed := 0
for _, account := range errorAccounts {
err := deleteS2AAccount(account.ID)
if err != nil {
failed++
logger.Warning(fmt.Sprintf("删除账号失败: ID=%d, Email=%s, Error=%v", account.ID, account.Email, err), account.Email, "cleaner")
} else {
success++
}
}
lastCleanTime = time.Now()
logger.Success(fmt.Sprintf("定期清理错误账号完成: 成功=%d, 失败=%d, 总数=%d", success, failed, len(errorAccounts)), "", "cleaner")
// 检查是否需要触发自动补号
autoRefillEnabled := false
if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" {
autoRefillEnabled = true
}
if autoRefillEnabled && totalBefore > 0 && success > 0 {
// 阈值默认 25%
threshold := 25
if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v > 0 && v <= 100 {
threshold = v
}
}
// 计算是否达到阈值
thresholdCount := totalBefore * threshold / 100
if thresholdCount < 1 {
thresholdCount = 1
}
if success >= thresholdCount {
logger.Info(fmt.Sprintf("清理触发自动补号: 已删除 %d 个 (总数 %d阈值 %d%%),将补充 %d 个账号",
success, totalBefore, threshold, success), "", "cleaner")
triggerAutoRefill(success)
}
}
}
// GetCleanerStatus 获取清理服务状态
func GetCleanerStatus() map[string]interface{} {
cleanerMu.Lock()
defer cleanerMu.Unlock()
enabled := false
interval := 3600
autoRefill := false
autoRefillThreshold := 25
if database.Instance != nil {
if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" {
enabled = true
}
if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
interval = v
}
}
if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" {
autoRefill = true
}
if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
autoRefillThreshold = v
}
}
}
return map[string]interface{}{
"running": cleanerRunning,
"enabled": enabled,
"interval": interval,
"last_clean_time": lastCleanTime.Format(time.RFC3339),
"auto_refill": autoRefill,
"auto_refill_threshold": autoRefillThreshold,
}
}