Files
codexautopool/backend/internal/api/auto_add.go

406 lines
11 KiB
Go

package api
import (
"fmt"
"net/http"
"strconv"
"sync"
"time"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/logger"
)
var (
autoAddRunning bool
autoAddMu sync.Mutex
autoAddStopChan chan struct{}
lastAutoAddTime time.Time
lastAutoRegTime time.Time // 上次自动注册时间
autoRegisterRunning bool // 自动注册是否正在运行
)
// StartAutoAddService 启动自动补号服务(后台检查器)
func StartAutoAddService() {
autoAddMu.Lock()
if autoAddRunning {
autoAddMu.Unlock()
return
}
autoAddRunning = true
autoAddStopChan = make(chan struct{})
autoAddMu.Unlock()
// 注意:这只是启动后台检查器,实际补号需要前端开启开关
logger.Info("自动补号检查器已启动(需在前端开启开关)", "", "auto-add")
go func() {
// 默认检查间隔 60 秒
checkInterval := 60
for {
// 动态读取检查间隔配置
if database.Instance != nil {
if val, _ := database.Instance.GetConfig("monitor_check_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 10 {
checkInterval = v
}
}
}
select {
case <-autoAddStopChan:
logger.Info("自动补号检查器已停止", "", "auto-add")
return
case <-time.After(time.Duration(checkInterval) * time.Second):
checkAndAutoAdd()
}
}
}()
}
// StopAutoAddService 停止自动补号服务
func StopAutoAddService() {
autoAddMu.Lock()
defer autoAddMu.Unlock()
if autoAddRunning && autoAddStopChan != nil {
close(autoAddStopChan)
autoAddRunning = false
}
}
// checkAndAutoAdd 检查并自动补号
func checkAndAutoAdd() {
if database.Instance == nil {
return
}
// 读取配置
autoAddEnabled := false
if val, _ := database.Instance.GetConfig("monitor_auto_add"); val == "true" {
autoAddEnabled = true
}
if !autoAddEnabled {
// 自动补号未开启,静默返回
return
}
// 检查最小间隔
minInterval := 300
if val, _ := database.Instance.GetConfig("monitor_min_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
minInterval = v
}
}
elapsed := time.Since(lastAutoAddTime).Seconds()
if elapsed < float64(minInterval) {
// 距离上次补号时间不足,显示等待信息
remaining := float64(minInterval) - elapsed
logger.Info(fmt.Sprintf("自动补号: 等待中 (还需 %.0f 秒)", remaining), "", "auto-add")
return
}
// 检查是否有任务在运行
if teamProcessState.Running {
logger.Info("已有任务在运行,跳过自动补号", "", "auto-add")
return
}
// 获取目标数量
target := 50
if val, _ := database.Instance.GetConfig("monitor_target"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
target = v
}
}
// 获取当前 S2A 账号数量
current, err := getS2AAccountCount()
if err != nil {
logger.Error(fmt.Sprintf("获取 S2A 账号数量失败: %v", err), "", "auto-add")
return
}
deficit := target - current
if deficit <= 0 {
logger.Info(fmt.Sprintf("自动补号: 当前 %d >= 目标 %d, 无需补号", current, target), "", "auto-add")
return
}
// 读取每 Team 成员数配置
membersPerTeam := 4
if val, _ := database.Instance.GetConfig("monitor_members_per_team"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 1 && v <= 10 {
membersPerTeam = v
}
}
// 计算需要多少个 Team
teamsNeeded := (deficit + membersPerTeam - 1) / membersPerTeam // 向上取整
// 获取可用的 Owner
owners, err := database.Instance.GetPendingOwners()
if err != nil {
logger.Error(fmt.Sprintf("获取可用 Owner 失败: %v", err), "", "auto-add")
return
}
if len(owners) == 0 {
logger.Warning("没有可用的 Owner 账号,无法自动补号", "", "auto-add")
// 检查是否开启自动注册
tryAutoRegisterOwners(teamsNeeded, 0)
return
}
// 限制使用的 Owner 数量
actualTeams := teamsNeeded
if actualTeams > len(owners) {
logger.Warning(fmt.Sprintf("可用 Owner 不足: 需要 %d 个, 仅有 %d 个可用", teamsNeeded, len(owners)), "", "auto-add")
// 检查是否开启自动注册,补充不足的母号
shortfall := teamsNeeded - len(owners)
tryAutoRegisterOwners(shortfall, len(owners))
actualTeams = len(owners)
}
logger.Info(fmt.Sprintf("自动补号: 当前 %d, 目标 %d, 缺少 %d, 需要 %d 个 Team, 将处理 %d 个",
current, target, deficit, teamsNeeded, actualTeams), "", "auto-add")
// 构建请求 - 直接使用 database.TeamOwner 转换
reqOwners := make([]TeamOwner, actualTeams)
for i := 0; i < actualTeams; i++ {
reqOwners[i] = TeamOwner{
Email: owners[i].Email,
Password: owners[i].Password,
Token: owners[i].Token,
AccountID: owners[i].AccountID,
}
}
// 读取代理配置 - 支持代理池模式
proxyURL := ""
replenishUseProxy := false
if val, _ := database.Instance.GetConfig("monitor_replenish_use_proxy"); val == "true" {
replenishUseProxy = true
}
if replenishUseProxy {
// 使用全局代理配置(支持 pool:random, pool:id:N 等格式)
proxyURL = config.Global.DefaultProxy
}
// 读取浏览器类型配置
browserType := "rod"
if val, _ := database.Instance.GetConfig("monitor_browser_type"); val != "" {
browserType = val
}
// 读取并发 Team 数配置
concurrentTeams := 2
if val, _ := database.Instance.GetConfig("monitor_concurrent_teams"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 1 && v <= 10 {
concurrentTeams = v
}
}
// 读取入库并发数配置
s2aConcurrency := 2
if val, _ := database.Instance.GetConfig("monitor_s2a_concurrency"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 1 && v <= 4 {
s2aConcurrency = v
}
}
req := TeamProcessRequest{
Owners: reqOwners,
MembersPerTeam: membersPerTeam,
ConcurrentTeams: concurrentTeams,
ConcurrentS2A: s2aConcurrency,
IncludeOwner: false,
Headless: true,
BrowserType: browserType,
Proxy: proxyURL,
}
// 输出代理使用状态日志
if proxyURL != "" {
displayProxy := proxyURL
if proxyURL == "pool:random" {
displayProxy = "代理池轮询模式"
} else if len(proxyURL) > 8 && proxyURL[:8] == "pool:id:" {
displayProxy = fmt.Sprintf("代理池固定项 (ID: %s)", proxyURL[8:])
}
logger.Info(fmt.Sprintf("自动补号: 使用代理 %s", displayProxy), "", "auto-add")
} else {
logger.Info("自动补号: 未使用代理", "", "auto-add")
}
// 初始化状态
teamProcessState.Running = true
teamProcessState.StartedAt = time.Now()
teamProcessState.TotalTeams = len(reqOwners)
teamProcessState.Completed = 0
teamProcessState.Results = make([]TeamProcessResult, 0, len(reqOwners))
lastAutoAddTime = time.Now()
// 异步执行
go runTeamProcess(req)
logger.Success(fmt.Sprintf("自动补号任务已启动: %d 个 Team, 每 Team %d 成员, 并发 %d (浏览器: %s)",
actualTeams, membersPerTeam, concurrentTeams, browserType), "", "auto-add")
}
// getS2AAccountCount 获取 S2A 当前账号数量
func getS2AAccountCount() (int, error) {
if config.Global == nil || config.Global.S2AApiBase == "" {
return 0, fmt.Errorf("S2A 配置未设置")
}
url := config.Global.S2AApiBase + "/api/v1/admin/dashboard/stats"
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return 0, err
}
// 设置认证头(与代理保持一致)
adminKey := config.Global.S2AAdminKey
req.Header.Set("Authorization", "Bearer "+adminKey)
req.Header.Set("X-API-Key", adminKey)
req.Header.Set("X-Admin-Key", adminKey)
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return 0, fmt.Errorf("S2A 返回状态: %d", resp.StatusCode)
}
var result struct {
Code int `json:"code"`
Message string `json:"message"`
Data struct {
NormalAccounts int `json:"normal_accounts"`
} `json:"data"`
}
if err := decodeJSON(resp.Body, &result); err != nil {
return 0, err
}
// S2A 返回 code=0 表示成功
if result.Code != 0 {
return 0, fmt.Errorf("S2A 返回错误: %s", result.Message)
}
return result.Data.NormalAccounts, nil
}
// tryAutoRegisterOwners 尝试自动注册母号
// count: 需要注册的数量
// currentOwners: 当前可用的母号数量(用于日志)
func tryAutoRegisterOwners(count int, currentOwners int) {
if database.Instance == nil {
return
}
// 检查是否开启自动注册
autoRegEnabled := false
if val, _ := database.Instance.GetConfig("monitor_auto_register"); val == "true" {
autoRegEnabled = true
}
if !autoRegEnabled {
logger.Info("自动注册未开启,跳过母号注册", "", "auto-add")
return
}
// 检查是否有注册任务在运行
autoAddMu.Lock()
if autoRegisterRunning {
autoAddMu.Unlock()
logger.Info("已有自动注册任务在运行,跳过", "", "auto-add")
return
}
// 检查 Team 注册是否在运行
teamRegState.mu.Lock()
teamRegRunning := teamRegState.Running
teamRegState.mu.Unlock()
if teamRegRunning {
autoAddMu.Unlock()
logger.Info("Team 注册任务正在运行,跳过自动注册", "", "auto-add")
return
}
// 检查距离上次自动注册的间隔(至少 5 分钟)
if time.Since(lastAutoRegTime) < 5*time.Minute {
autoAddMu.Unlock()
remaining := 5*time.Minute - time.Since(lastAutoRegTime)
logger.Info(fmt.Sprintf("自动注册: 冷却中 (还需 %.0f 秒)", remaining.Seconds()), "", "auto-add")
return
}
autoRegisterRunning = true
lastAutoRegTime = time.Now()
autoAddMu.Unlock()
// 读取自动注册配置
concurrency := 2
if val, _ := database.Instance.GetConfig("monitor_auto_reg_concurrency"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 1 && v <= 10 {
concurrency = v
}
}
useProxy := false
if val, _ := database.Instance.GetConfig("monitor_auto_reg_use_proxy"); val == "true" {
useProxy = true
}
// 获取代理地址
proxyURL := ""
if useProxy && config.Global != nil {
proxyURL = config.Global.DefaultProxy
}
// 限制单次注册数量(最多 20 个)
if count > 20 {
count = 20
}
if count < 1 {
count = 1
}
logger.Info(fmt.Sprintf("自动注册: 当前母号 %d 个不足,将注册 %d 个新账号 (并发: %d, 代理: %v)",
currentOwners, count, concurrency, useProxy), "", "auto-add")
// 构建注册配置
regConfig := TeamRegConfig{
Count: count,
Concurrency: concurrency,
Proxy: proxyURL,
AutoImport: true, // 自动导入到数据库
}
// 异步执行注册
go func() {
defer func() {
autoAddMu.Lock()
autoRegisterRunning = false
autoAddMu.Unlock()
}()
// 调用 Team 注册流程
runTeamRegProcess(regConfig)
logger.Info("自动注册任务已完成", "", "auto-add")
}()
}