diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 2ac3e41..d5efea8 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -78,6 +78,10 @@ func main() { } fmt.Println() + // 启动自动补号服务 + api.StartAutoAddService() + fmt.Printf("%s[服务]%s 自动补号服务已启动\n", colorGreen, colorReset) + // 启动服务器 startServer(cfg) } diff --git a/backend/internal/api/auto_add.go b/backend/internal/api/auto_add.go new file mode 100644 index 0000000..6ef0584 --- /dev/null +++ b/backend/internal/api/auto_add.go @@ -0,0 +1,209 @@ +package api + +import ( + "fmt" + "net/http" + "strconv" + "sync" + "time" + + "codex-pool/internal/config" + "codex-pool/internal/database" + "codex-pool/internal/logger" +) + +var ( + autoAddRunning bool + autoAddMu sync.Mutex + autoAddStopChan chan struct{} + lastAutoAddTime time.Time +) + +// StartAutoAddService 启动自动补号服务 +func StartAutoAddService() { + autoAddMu.Lock() + if autoAddRunning { + autoAddMu.Unlock() + return + } + autoAddRunning = true + autoAddStopChan = make(chan struct{}) + autoAddMu.Unlock() + + logger.Info("自动补号服务已启动", "", "auto-add") + + go func() { + ticker := time.NewTicker(60 * time.Second) // 每分钟检查一次 + defer ticker.Stop() + + for { + select { + case <-autoAddStopChan: + logger.Info("自动补号服务已停止", "", "auto-add") + return + case <-ticker.C: + checkAndAutoAdd() + } + } + }() +} + +// StopAutoAddService 停止自动补号服务 +func StopAutoAddService() { + autoAddMu.Lock() + defer autoAddMu.Unlock() + + if autoAddRunning && autoAddStopChan != nil { + close(autoAddStopChan) + autoAddRunning = false + } +} + +// checkAndAutoAdd 检查并自动补号 +func checkAndAutoAdd() { + if database.Instance == nil { + return + } + + // 读取配置 + autoAddEnabled := false + if val, _ := database.Instance.GetConfig("monitor_auto_add"); val == "true" { + autoAddEnabled = true + } + if !autoAddEnabled { + return + } + + // 检查最小间隔 + minInterval := 300 + if val, _ := database.Instance.GetConfig("monitor_min_interval"); val != "" { + if v, err := strconv.Atoi(val); err == nil { + minInterval = v + } + } + + if time.Since(lastAutoAddTime).Seconds() < float64(minInterval) { + return + } + + // 检查是否有任务在运行 + if teamProcessState.Running { + logger.Info("已有任务在运行,跳过自动补号", "", "auto-add") + return + } + + // 获取目标数量 + target := 50 + if val, _ := database.Instance.GetConfig("monitor_target"); val != "" { + if v, err := strconv.Atoi(val); err == nil { + target = v + } + } + + // 获取当前 S2A 账号数量 + current, err := getS2AAccountCount() + if err != nil { + logger.Error(fmt.Sprintf("获取 S2A 账号数量失败: %v", err), "", "auto-add") + return + } + + deficit := target - current + if deficit <= 0 { + return + } + + // 计算需要多少个 Team(每个 Team 产生 4 个账号) + teamsNeeded := (deficit + 3) / 4 // 向上取整 + + // 获取可用的 Owner + owners, err := database.Instance.GetPendingOwners() + if err != nil { + logger.Error(fmt.Sprintf("获取可用 Owner 失败: %v", err), "", "auto-add") + return + } + + if len(owners) == 0 { + logger.Warning("没有可用的 Owner 账号,无法自动补号", "", "auto-add") + return + } + + // 限制使用的 Owner 数量 + actualTeams := teamsNeeded + if actualTeams > len(owners) { + logger.Warning(fmt.Sprintf("可用 Owner 不足: 需要 %d 个, 仅有 %d 个可用", teamsNeeded, len(owners)), "", "auto-add") + actualTeams = len(owners) + } + + logger.Info(fmt.Sprintf("自动补号: 当前 %d, 目标 %d, 缺少 %d, 需要 %d 个 Team, 将处理 %d 个", + current, target, deficit, teamsNeeded, actualTeams), "", "auto-add") + + // 构建请求 - 直接使用 database.TeamOwner 转换 + reqOwners := make([]TeamOwner, actualTeams) + for i := 0; i < actualTeams; i++ { + reqOwners[i] = TeamOwner{ + Email: owners[i].Email, + Password: owners[i].Password, + Token: owners[i].Token, + AccountID: owners[i].AccountID, + } + } + + req := TeamProcessRequest{ + Owners: reqOwners, + MembersPerTeam: 4, + ConcurrentTeams: 2, + IncludeOwner: false, + Headless: true, + BrowserType: "rod", // 默认使用 rod + Proxy: "", // 不使用代理 + } + + // 初始化状态 + teamProcessState.Running = true + teamProcessState.StartedAt = time.Now() + teamProcessState.TotalTeams = len(reqOwners) + teamProcessState.Completed = 0 + teamProcessState.Results = make([]TeamProcessResult, 0, len(reqOwners)) + + lastAutoAddTime = time.Now() + + // 异步执行 + go runTeamProcess(req) + + logger.Success(fmt.Sprintf("自动补号任务已启动: %d 个 Team", actualTeams), "", "auto-add") +} + +// getS2AAccountCount 获取 S2A 当前账号数量 +func getS2AAccountCount() (int, error) { + if config.Global == nil || config.Global.S2AApiBase == "" { + return 0, fmt.Errorf("S2A 配置未设置") + } + + url := config.Global.S2AApiBase + "/api/v1/admin/dashboard/stats" + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + req.Header.Set("Authorization", "Bearer "+config.Global.S2AAdminKey) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return 0, fmt.Errorf("S2A 返回状态: %d", resp.StatusCode) + } + + var result struct { + NormalAccounts int `json:"normal_accounts"` + } + + if err := decodeJSON(resp.Body, &result); err != nil { + return 0, err + } + + return result.NormalAccounts, nil +} diff --git a/backend/internal/api/http.go b/backend/internal/api/http.go index 0fccfef..29ab523 100644 --- a/backend/internal/api/http.go +++ b/backend/internal/api/http.go @@ -52,3 +52,17 @@ func CORS(next http.HandlerFunc) http.HandlerFunc { next(w, r) } } + +// decodeJSON 解析 JSON 响应体 +func decodeJSON(body interface{}, v interface{}) error { + switch b := body.(type) { + case []byte: + return json.Unmarshal(b, v) + default: + // 假设是 io.Reader + if reader, ok := body.(interface{ Read([]byte) (int, error) }); ok { + return json.NewDecoder(reader).Decode(v) + } + return json.Unmarshal(body.([]byte), v) + } +} diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index 96289c1..48104ab 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "strings" "sync" "sync/atomic" "time" @@ -17,15 +18,18 @@ import ( "codex-pool/internal/register" ) +// TeamOwner 团队母号信息 +type TeamOwner struct { + Email string `json:"email"` + Password string `json:"password"` + Token string `json:"token"` + AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用 +} + // TeamProcessRequest 团队处理请求 type TeamProcessRequest struct { // Owner 账号列表 - Owners []struct { - Email string `json:"email"` - Password string `json:"password"` - Token string `json:"token"` - AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用 - } `json:"owners"` + Owners []TeamOwner `json:"owners"` // 配置 MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数 ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量 @@ -33,6 +37,7 @@ type TeamProcessRequest struct { Headless bool `json:"headless"` // 是否无头模式 Proxy string `json:"proxy"` // 代理设置 IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A + ProcessCount int `json:"process_count"` // 处理数量,0表示全部 } // TeamProcessResult 团队处理结果 @@ -91,12 +96,7 @@ func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { } // 转换为请求格式(包含已存储的 account_id) for _, o := range pendingOwners { - req.Owners = append(req.Owners, struct { - Email string `json:"email"` - Password string `json:"password"` - Token string `json:"token"` - AccountID string `json:"account_id"` - }{ + req.Owners = append(req.Owners, TeamOwner{ Email: o.Email, Password: o.Password, Token: o.Token, @@ -106,6 +106,12 @@ func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team") } + // 根据 ProcessCount 限制处理数量 + if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) { + req.Owners = req.Owners[:req.ProcessCount] + logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team") + } + if req.MembersPerTeam <= 0 { req.MembersPerTeam = 4 } @@ -330,7 +336,28 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } result.TeamID = teamID - // Step 2: 并发注册成员 + // Step 2: 测试邀请功能(检测 Team 是否被封禁) + testEmail := mail.GenerateEmail() + if err := inviter.SendInvites([]string{testEmail}); err != nil { + // 邀请失败,可能是 Team 被封禁 + errStr := err.Error() + if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") || + strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") || + strings.Contains(errStr, "deactivated") { + // Team 被封禁,标记为 invalid + logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team") + if database.Instance != nil { + database.Instance.MarkOwnerAsInvalid(owner.Email) + } + result.Errors = append(result.Errors, "Team 被封禁") + result.DurationMs = time.Since(startTime).Milliseconds() + return result + } + // 其他邀请错误,继续尝试 + logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team") + } + + // Step 3: 并发注册成员 // 每个成员:邀请 → 注册,失败重试1次 // Team 有4次额外补救机会 type MemberAccount struct { diff --git a/backend/internal/database/sqlite.go b/backend/internal/database/sqlite.go index 3334c7b..80a8419 100644 --- a/backend/internal/database/sqlite.go +++ b/backend/internal/database/sqlite.go @@ -253,6 +253,12 @@ func (d *DB) MarkOwnerAsFailed(email string) error { return err } +// MarkOwnerAsInvalid 标记 Owner 为无效(Team 被封禁,永久跳过) +func (d *DB) MarkOwnerAsInvalid(email string) error { + _, err := d.db.Exec("UPDATE team_owners SET status = 'invalid' WHERE email = ?", email) + return err +} + // UpdateOwnerStatus 更新状态 func (d *DB) UpdateOwnerStatus(id int64, status string) error { _, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id) diff --git a/frontend/src/pages/Monitor.tsx b/frontend/src/pages/Monitor.tsx index 6a9852e..5033ae6 100644 --- a/frontend/src/pages/Monitor.tsx +++ b/frontend/src/pages/Monitor.tsx @@ -89,29 +89,6 @@ export default function Monitor() { return json } - // 获取号池状态 - 使用 S2A 的 dashboard/stats 接口 - const fetchPoolStatus = useCallback(async () => { - try { - const data = await requestS2A('/dashboard/stats') - if (data) { - // 从 dashboard/stats 构建 pool status - setPoolStatus({ - current: data.normal_accounts || 0, - target: targetInput, - deficit: Math.max(0, targetInput - (data.normal_accounts || 0)), - auto_add: autoAdd, - min_interval: minInterval, - polling_enabled: pollingEnabled, - polling_interval: pollingInterval, - }) - // 同时更新统计数据 - setStats(data) - } - } catch (e) { - console.error('获取号池状态失败:', e) - } - }, [targetInput, autoAdd, minInterval, pollingEnabled, pollingInterval]) - // 刷新 S2A 统计 - 使用 dashboard/stats const refreshStats = useCallback(async () => { setRefreshing(true) @@ -277,27 +254,59 @@ export default function Monitor() { const json = await res.json() if (json.code === 0 && json.data) { const s = json.data - setTargetInput(s.target || 50) - setAutoAdd(s.auto_add || false) - setMinInterval(s.min_interval || 300) - setPollingEnabled(s.polling_enabled || false) + const target = s.target || 50 + const autoAddVal = s.auto_add || false + const minIntervalVal = s.min_interval || 300 + const pollingEnabledVal = s.polling_enabled || false const interval = s.polling_interval || 60 + + setTargetInput(target) + setAutoAdd(autoAddVal) + setMinInterval(minIntervalVal) + setPollingEnabled(pollingEnabledVal) setPollingInterval(interval) - savedPollingIntervalRef.current = interval // 同步更新 ref + savedPollingIntervalRef.current = interval setCountdown(interval) + + // 返回加载的配置用于后续刷新 + return { target, autoAdd: autoAddVal, minInterval: minIntervalVal, pollingEnabled: pollingEnabledVal, pollingInterval: interval } } } } catch (e) { console.error('加载监控设置失败:', e) } + return null } // 初始化 - 只在组件挂载时执行一次 useEffect(() => { - loadMonitorSettings() - fetchPoolStatus() - refreshStats() - fetchAutoAddLogs() + const init = async () => { + // 先加载设置 + const settings = await loadMonitorSettings() + + // 然后刷新状态(使用加载的设置值) + try { + const data = await requestS2A('/dashboard/stats') + if (data) { + const target = settings?.target || 50 + setStats(data) + setPoolStatus({ + current: data.normal_accounts || 0, + target: target, + deficit: Math.max(0, target - (data.normal_accounts || 0)), + auto_add: settings?.autoAdd || false, + min_interval: settings?.minInterval || 300, + polling_enabled: settings?.pollingEnabled || false, + polling_interval: settings?.pollingInterval || 60, + }) + } + } catch (e) { + console.error('获取号池状态失败:', e) + } + + fetchAutoAddLogs() + } + init() // eslint-disable-next-line react-hooks/exhaustive-deps }, []) diff --git a/frontend/src/pages/Upload.tsx b/frontend/src/pages/Upload.tsx index 15ededb..a3ed17d 100644 --- a/frontend/src/pages/Upload.tsx +++ b/frontend/src/pages/Upload.tsx @@ -72,6 +72,7 @@ export default function Upload() { const [browserType, setBrowserType] = useState<'chromedp' | 'rod'>('chromedp') const [proxy, setProxy] = useState('') const [includeOwner, setIncludeOwner] = useState(false) // 母号也入库 + const [processCount, setProcessCount] = useState(0) // 处理数量,0表示全部 const hasConfig = config.s2a.apiBase && config.s2a.adminKey @@ -177,6 +178,7 @@ export default function Upload() { headless: true, // 始终使用无头模式 proxy, include_owner: includeOwner, // 母号也入库 + process_count: processCount, // 处理数量,0表示全部 }), }) @@ -192,7 +194,7 @@ export default function Upload() { alert('启动失败') } setLoading(false) - }, [stats, membersPerTeam, concurrentTeams, browserType, proxy, includeOwner, fetchStatus]) + }, [stats, membersPerTeam, concurrentTeams, browserType, proxy, includeOwner, processCount, fetchStatus]) // 停止处理 const handleStop = useCallback(async () => { @@ -371,6 +373,37 @@ export default function Upload() { + {/* 处理数量设置 */} +
+ +
+ setProcessCount(Number(e.target.value) || 0)} + disabled={isRunning} + placeholder="输入数量" + className="flex-1" + /> + +
+

+ {processCount > 0 ? `将处理 ${Math.min(processCount, stats?.valid || 0)} 个母号` : `将处理全部 ${stats?.valid || 0} 个母号`} +

+
+