package api import ( "encoding/json" "fmt" "math/rand" "net/http" "strings" "sync" "sync/atomic" "time" "codex-pool/internal/auth" "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/demote" "codex-pool/internal/invite" "codex-pool/internal/logger" "codex-pool/internal/mail" "codex-pool/internal/proxyutil" "codex-pool/internal/register" ) // TeamOwner 团队母号信息 type TeamOwner struct { Email string `json:"email"` Password string `json:"password"` Token string `json:"token"` AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用 } // TeamProcessRequest 团队处理请求 type TeamProcessRequest struct { // Owner 账号列表 Owners []TeamOwner `json:"owners"` // 配置 MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数 ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量 ConcurrentS2A int `json:"concurrent_s2a"` // 入库并发数(默认2) BrowserType string `json:"browser_type"` // "chromedp" 或 "rod" Headless bool `json:"headless"` // 是否无头模式 Proxy string `json:"proxy"` // 代理设置 IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A ProcessCount int `json:"process_count"` // 处理数量,0表示全部 } // TeamProcessResult 团队处理结果 type TeamProcessResult struct { TeamIndex int `json:"team_index"` OwnerEmail string `json:"owner_email"` TeamID string `json:"team_id"` Registered int `json:"registered"` AddedToS2A int `json:"added_to_s2a"` MemberEmails []string `json:"member_emails"` Errors []string `json:"errors"` DurationMs int64 `json:"duration_ms"` } // TeamProcessState 处理状态 type TeamProcessState struct { Running bool `json:"running"` StartedAt time.Time `json:"started_at"` TotalTeams int `json:"total_teams"` Completed int32 `json:"completed"` Results []TeamProcessResult `json:"results"` mu sync.Mutex stopCh chan struct{} // 停止信号 channel,close 后所有 select 立即返回 } var teamProcessState = &TeamProcessState{} // isStopped 检查是否已收到停止信号(非阻塞) func isStopped() bool { select { case <-teamProcessState.stopCh: return true default: return false } } // waitGroupWithTimeout 带超时的 WaitGroup 等待,超时返回 false func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { done := make(chan struct{}) go func() { wg.Wait(); close(done) }() select { case <-done: return true case <-teamProcessState.stopCh: return false case <-time.After(timeout): return false } } // getProxyDisplay 获取代理显示名称(隐藏密码) func getProxyDisplay(proxy string) string { if proxy == "" { return "无代理" } // 尝试解析 URL,只返回 host 部分 if strings.Contains(proxy, "@") { parts := strings.Split(proxy, "@") if len(parts) >= 2 { return parts[len(parts)-1] // 返回 @ 后面的 host:port 部分 } } // 去掉协议前缀 proxy = strings.TrimPrefix(proxy, "http://") proxy = strings.TrimPrefix(proxy, "https://") proxy = strings.TrimPrefix(proxy, "socks5://") return proxy } // HandleTeamProcess POST /api/team/process - 启动 Team 批量处理 func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } // 检查是否正在运行 if teamProcessState.Running { Error(w, http.StatusConflict, "已有任务正在运行") return } var req TeamProcessRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { Error(w, http.StatusBadRequest, "请求格式错误") return } // 如果没有传入 owners,从数据库获取待处理的母号 if len(req.Owners) == 0 { pendingOwners, err := database.Instance.GetPendingOwners() if err != nil { Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err)) return } if len(pendingOwners) == 0 { Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件") return } // 转换为请求格式(包含已存储的 account_id) for _, o := range pendingOwners { req.Owners = append(req.Owners, TeamOwner{ Email: o.Email, Password: o.Password, Token: o.Token, AccountID: o.AccountID, // 直接使用数据库中存储的 account_id }) } logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team") } // 根据 ProcessCount 限制处理数量 if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) { req.Owners = req.Owners[:req.ProcessCount] logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team") } if req.MembersPerTeam <= 0 { req.MembersPerTeam = 4 } if req.ConcurrentTeams <= 0 { req.ConcurrentTeams = len(req.Owners) } if req.ConcurrentTeams > len(req.Owners) { req.ConcurrentTeams = len(req.Owners) } if req.BrowserType == "" { req.BrowserType = "chromedp" // 默认使用 Chromedp } if req.ConcurrentS2A <= 0 { req.ConcurrentS2A = 2 // 默认入库并发数为 2 } if req.ConcurrentS2A > 4 { req.ConcurrentS2A = 4 // 最大入库并发数为 4(避免浏览器资源耗尽) } if req.Proxy == "" && config.Global != nil { req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法 } if req.Proxy != "" { // 如果是模式字符串(pool:random, pool:id:N),跳过 Normalize if !strings.HasPrefix(req.Proxy, "pool:") && req.Proxy != "[RANDOM]" { normalized, err := proxyutil.Normalize(req.Proxy) if err != nil { Error(w, http.StatusBadRequest, fmt.Sprintf("代理格式错误: %v", err)) return } req.Proxy = normalized } } // 启动新任务前,先清理数据库中遗留的 running 状态批次 if database.Instance != nil { if affected, err := database.Instance.CleanupStuckBatchRuns(); err == nil && affected > 0 { logger.Warning(fmt.Sprintf("清理了 %d 个遗留的运行中批次记录", affected), "", "team") } } // 初始化状态 teamProcessState.Running = true teamProcessState.stopCh = make(chan struct{}) teamProcessState.StartedAt = time.Now() teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理 teamProcessState.Completed = 0 teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners)) // 异步执行 go runTeamProcess(req) Success(w, map[string]interface{}{ "message": "任务已启动", "total_teams": len(req.Owners), "concurrent_teams": req.ConcurrentTeams, "started_at": teamProcessState.StartedAt, }) } // HandleTeamProcessStatus GET /api/team/status - 获取处理状态 func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { Error(w, http.StatusMethodNotAllowed, "仅支持 GET") return } teamProcessState.mu.Lock() defer teamProcessState.mu.Unlock() Success(w, map[string]interface{}{ "running": teamProcessState.Running, "started_at": teamProcessState.StartedAt, "total_teams": teamProcessState.TotalTeams, "completed": teamProcessState.Completed, "results": teamProcessState.Results, "elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(), }) } // HandleTeamProcessStop POST /api/team/stop - 停止处理 func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } teamProcessState.Running = false // 关闭 stopCh 广播停止信号,所有 select 监听的 goroutine 立即唤醒 if teamProcessState.stopCh != nil { select { case <-teamProcessState.stopCh: // 已经关闭过了,不重复关闭 default: close(teamProcessState.stopCh) } } logger.Warning("收到停止信号,正在终止所有处理...", "", "team") Success(w, map[string]string{"message": "已发送停止信号"}) } // HandleBatchHistory GET /api/batch/history - 获取历史批次(分页) func HandleBatchHistory(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { Error(w, http.StatusMethodNotAllowed, "仅支持 GET") return } if database.Instance == nil { Error(w, http.StatusInternalServerError, "数据库未初始化") return } // 获取分页参数 page := 1 pageSize := 5 if p := r.URL.Query().Get("page"); p != "" { if v, err := fmt.Sscanf(p, "%d", &page); err == nil && v > 0 { // page已设置 } } if ps := r.URL.Query().Get("page_size"); ps != "" { if v, err := fmt.Sscanf(ps, "%d", &pageSize); err == nil && v > 0 { // pageSize已设置 } } runs, total, err := database.Instance.GetBatchRunsWithPagination(page, pageSize) if err != nil { Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err)) return } totalPages := (total + pageSize - 1) / pageSize Success(w, map[string]interface{}{ "runs": runs, "total": total, "page": page, "page_size": pageSize, "total_pages": totalPages, }) } // HandleBatchDetail GET /api/batch/detail?id=xxx - 获取批次详情(包含Team结果) func HandleBatchDetail(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { Error(w, http.StatusMethodNotAllowed, "仅支持 GET") return } if database.Instance == nil { Error(w, http.StatusInternalServerError, "数据库未初始化") return } idStr := r.URL.Query().Get("id") if idStr == "" { Error(w, http.StatusBadRequest, "缺少批次ID") return } var batchID int64 if _, err := fmt.Sscanf(idStr, "%d", &batchID); err != nil { Error(w, http.StatusBadRequest, "无效的批次ID") return } run, results, err := database.Instance.GetBatchRunWithResults(batchID) if err != nil { Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err)) return } Success(w, map[string]interface{}{ "batch": run, "results": results, }) } // runTeamProcess 执行 Team 批量处理 - 使用工作池模式 func runTeamProcess(req TeamProcessRequest) { totalOwners := len(req.Owners) workerCount := req.ConcurrentTeams // 同时运行的 worker 数量 if workerCount > totalOwners { workerCount = totalOwners } if workerCount <= 0 { workerCount = 2 // 默认 2 个并发 } // 创建批次记录 var batchID int64 if database.Instance != nil { var err error batchID, err = database.Instance.CreateBatchRun(totalOwners) if err != nil { logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team") } } // 统计变量(在 defer 中使用) var totalRegistered, totalAddedToS2A int var allErrors []string // 确保任务结束时更新状态和批次记录 defer func() { teamProcessState.Running = false // 清理残留的 processing 状态母号,重置为 valid if database.Instance != nil { if affected, err := database.Instance.CleanupStuckProcessingOwners(); err == nil && affected > 0 { logger.Warning(fmt.Sprintf("批次结束,重置了 %d 个残留的处理中母号为有效状态", affected), "", "team") } } // 无论任务是正常完成还是异常中断,都更新批次记录状态 if database.Instance != nil && batchID > 0 { errorsStr := "" if len(allErrors) > 0 { // 只保留前10条错误 if len(allErrors) > 10 { allErrors = allErrors[:10] } errorsStr = fmt.Sprintf("%v", allErrors) } if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil { logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team") } } }() logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team") // 任务队列 taskChan := make(chan int, totalOwners) resultChan := make(chan TeamProcessResult, totalOwners) // 连续入库为0的暂停机制 var pauseMu sync.Mutex pauseCond := sync.NewCond(&pauseMu) paused := false var wg sync.WaitGroup // 启动 worker for w := 0; w < workerCount; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for idx := range taskChan { if isStopped() { return } // 检查是否处于暂停状态,等待恢复 pauseMu.Lock() for paused { pauseCond.Wait() } pauseMu.Unlock() result := processSingleTeam(idx, req) resultChan <- result atomic.AddInt32(&teamProcessState.Completed, 1) } }(w) } // 发送任务 go func() { for i := 0; i < totalOwners; i++ { select { case <-teamProcessState.stopCh: break case taskChan <- i: } if isStopped() { break } } close(taskChan) }() // 等待完成并收集结果 go func() { wg.Wait() close(resultChan) }() // 连续入库为0的计数器 consecutiveZeroS2A := 0 // 收集结果并统计 for result := range resultChan { teamProcessState.mu.Lock() teamProcessState.Results = append(teamProcessState.Results, result) teamProcessState.mu.Unlock() totalRegistered += result.Registered totalAddedToS2A += result.AddedToS2A allErrors = append(allErrors, result.Errors...) // 检测连续入库为0的情况 if result.AddedToS2A == 0 { consecutiveZeroS2A++ if consecutiveZeroS2A >= 2 { logger.Warning(fmt.Sprintf("⚠ 连续 %d 个 Team 入库数为 0 (最近: Team %d - %s),暂停 1 分钟后继续...", consecutiveZeroS2A, result.TeamIndex, result.OwnerEmail), "", "team") // 设置暂停标志,阻止 worker 取新任务 pauseMu.Lock() paused = true pauseMu.Unlock() // 等待 1 分钟(可被停止信号中断) select { case <-time.After(1 * time.Minute): logger.Info("暂停结束,恢复批量处理...", "", "team") case <-teamProcessState.stopCh: logger.Warning("暂停期间收到停止信号,终止处理", "", "team") } // 恢复 worker pauseMu.Lock() paused = false pauseMu.Unlock() pauseCond.Broadcast() // 重置计数器 consecutiveZeroS2A = 0 } } else { // 入库不为0,重置连续计数 consecutiveZeroS2A = 0 } // 保存单个Team结果到数据库 if database.Instance != nil && batchID > 0 { dbResult := database.BatchTeamResult{ TeamIndex: result.TeamIndex, OwnerEmail: result.OwnerEmail, TeamID: result.TeamID, Registered: result.Registered, AddedToS2A: result.AddedToS2A, MemberEmails: result.MemberEmails, Errors: result.Errors, DurationMs: result.DurationMs, } if err := database.Instance.SaveBatchTeamResult(batchID, dbResult); err != nil { logger.Error(fmt.Sprintf("保存Team结果失败: %v", err), result.OwnerEmail, "team") } } } // 计算成功率 expectedTotal := totalOwners * req.MembersPerTeam successRate := float64(0) if expectedTotal > 0 { successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100 } logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%", teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team") } // processSingleTeam 处理单个 Team func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) { startTime := time.Now() owner := req.Owners[idx] result = TeamProcessResult{ TeamIndex: idx + 1, OwnerEmail: owner.Email, MemberEmails: make([]string, 0), Errors: make([]string, 0), } // 固定宽度的 Team 编号 (支持到 Team 99) logPrefix := fmt.Sprintf("[Team %2d]", idx+1) // panic 恢复,防止单个 Team 处理崩溃影响整个任务 defer func() { if r := recover(); r != nil { logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team") result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r)) result.DurationMs = time.Since(startTime).Milliseconds() // 恢复为 valid 允许重试 if database.Instance != nil { database.Instance.MarkOwnerAsFailed(owner.Email) } } }() logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team") // 标记 owner 为处理中 if database.Instance != nil { database.Instance.MarkOwnerAsProcessing(owner.Email) } // 处理失败时的清理函数 markOwnerResult := func(success bool) { if database.Instance != nil { if success { database.Instance.MarkOwnerAsUsed(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 母号已使用并删除: %s", logPrefix, owner.Email), owner.Email, "team") // 如果开启了母号降级,尝试降级 if config.Global != nil && config.Global.DemoteAfterUse { go func() { logger.Info(fmt.Sprintf("%s 尝试降级母号...", logPrefix), owner.Email, "team") result := demote.DemoteOwner(demote.DemoteRequest{ AccessToken: owner.Token, AccountID: owner.AccountID, Role: "standard-user", Proxy: req.Proxy, }) if result.Success { logger.Success(fmt.Sprintf("%s 母号已降级为普通成员", logPrefix), owner.Email, "team") } else { logger.Warning(fmt.Sprintf("%s 母号降级失败: %s", logPrefix, result.Error), owner.Email, "team") } }() } } else { // 失败时恢复为 valid,允许重试 database.Instance.MarkOwnerAsFailed(owner.Email) } } } // Step 0: 预检查账户状态(封禁/Token过期检测) resolvedProxy := database.Instance.ResolveProxy(req.Proxy) preChecker := invite.NewWithProxy(owner.Token, resolvedProxy) accountStatus := preChecker.CheckAccountStatus() if accountStatus.Status == "banned" { logger.Error(fmt.Sprintf("%s 母号被封禁,跳过处理: %s", logPrefix, accountStatus.Error), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 封禁母号已删除: %s", logPrefix, owner.Email), owner.Email, "team") } result.Errors = append(result.Errors, "账户被封禁") result.DurationMs = time.Since(startTime).Milliseconds() return result } if accountStatus.Status == "token_expired" { logger.Error(fmt.Sprintf("%s Token已过期,跳过处理", logPrefix), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s Token过期母号已删除: %s", logPrefix, owner.Email), owner.Email, "team") } result.Errors = append(result.Errors, "Token已过期") result.DurationMs = time.Since(startTime).Milliseconds() return result } if accountStatus.Status == "error" { logger.Warning(fmt.Sprintf("%s 账户状态检查失败: %s,继续尝试", logPrefix, accountStatus.Error), owner.Email, "team") } else if accountStatus.PlanType == "free" { logger.Warning(fmt.Sprintf("%s 母号 plan 为 free,非 Team 账户,移除", logPrefix), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s free 母号已删除: %s", logPrefix, owner.Email), owner.Email, "team") } result.Errors = append(result.Errors, "账户 plan 为 free,非 Team 账户") result.DurationMs = time.Since(startTime).Milliseconds() return result } else { logger.Info(fmt.Sprintf("%s 账户状态正常: %s", logPrefix, accountStatus.PlanType), owner.Email, "team") } // Step 1: 获取 Team ID(优先使用已存储的 account_id) var teamID string inviter := invite.NewWithProxy(owner.Token, resolvedProxy) if owner.AccountID != "" { // 直接使用数据库中存储的 account_id teamID = owner.AccountID inviter.SetAccountID(teamID) // 必须设置到 inviter 中 logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team") } else { // 如果没有存储,才请求 API 获取 var err error teamID, err = inviter.GetAccountID() if err != nil { errStr := err.Error() result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team") // Token 过期或无效,标记为 invalid 不再重试 if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") || strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") { logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 母号无效已删除: %s", logPrefix, owner.Email), owner.Email, "team") } return result } markOwnerResult(false) return result } logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team") } result.TeamID = teamID // Step 2: 测试邀请功能(检测 Team 是否被封禁或已满) testEmail := mail.GenerateEmail() if err := inviter.SendInvites([]string{testEmail}); err != nil { errStr := err.Error() // 检测 Team 已达邀请上限(401 或 maximum number of seats) if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") { logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsUsed(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 母号已使用并删除(邀请已满): %s", logPrefix, owner.Email), owner.Email, "team") } result.Errors = append(result.Errors, "Team 邀请已满") result.DurationMs = time.Since(startTime).Milliseconds() return result } // 检测 Team 被封禁 if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") || strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") || strings.Contains(errStr, "deactivated") { logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 母号无效已删除(Team被封禁): %s", logPrefix, owner.Email), owner.Email, "team") } result.Errors = append(result.Errors, "Team 被封禁") result.DurationMs = time.Since(startTime).Milliseconds() return result } // 其他邀请错误,继续尝试 logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team") } // Step 3: 流水线模式 - 注册成功的成员立即开始入库 // 每个成员:邀请 → 注册 → 入库,失败重试1次 // Team 有4次额外补救机会 type MemberAccount struct { Email string Password string Success bool S2ADone bool // 入库是否完成 S2AOK bool // 入库是否成功 } children := make([]MemberAccount, req.MembersPerTeam) var memberMu sync.Mutex // 共享标志:Team 邀请已满,所有 goroutine 应停止 var teamExhausted int32 // 入库计数器 var s2aSuccessCount int32 var s2aFailCount int32 // Team 级别 500 错误熔断器:当同 Team 多个成员连续遇到 500 时快速失败 var consecutive500Fails int32 // 入库并发控制信号量 s2aSem := make(chan struct{}, req.ConcurrentS2A) // 检查 Team 是否已满的辅助函数 isTeamExhausted := func() bool { return atomic.LoadInt32(&teamExhausted) == 1 } // 标记 Team 已满 markTeamExhausted := func() { if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) { logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsUsed(owner.Email) database.Instance.DeleteTeamOwnerByEmail(owner.Email) logger.Info(fmt.Sprintf("%s 母号已使用并删除(Team耗尽): %s", logPrefix, owner.Email), owner.Email, "team") } } } // 入库单个成员的函数 doS2A := func(memberIdx int, memberEmail, memberPassword string) bool { memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1) memberStartTime := time.Now() // 检查停止信号 if isStopped() { return false } // 获取入库信号量(3分钟超时,可被停止信号中断) select { case s2aSem <- struct{}{}: case <-teamProcessState.stopCh: logger.Warning(fmt.Sprintf("%s 收到停止信号,跳过入库", memberLogPrefix), memberEmail, "team") return false case <-time.After(3 * time.Minute): logger.Warning(fmt.Sprintf("%s 入库信号量等待超时 (3分钟)", memberLogPrefix), memberEmail, "team") atomic.AddInt32(&s2aFailCount, 1) memberMu.Lock() result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库信号量超时", memberIdx+1)) memberMu.Unlock() return false } defer func() { <-s2aSem }() var s2aSuccess bool var lastError string for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 // 检查停止信号 if isStopped() { return false } // 熔断检查:同 Team 已有 3+ 个成员连续 500 失败,快速跳过 if atomic.LoadInt32(&consecutive500Fails) >= 3 { logger.Warning(fmt.Sprintf("%s 同 Team 已有多个成员连续 500 失败,跳过入库", memberLogPrefix), memberEmail, "team") atomic.AddInt32(&s2aFailCount, 1) memberMu.Lock() result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库跳过: 服务器持续 500 (熔断)", memberIdx+1)) memberMu.Unlock() return false } if attempt > 0 { // 重试前短暂等待(可被停止信号中断) retryDelay := time.Duration(3) * time.Second logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team") select { case <-time.After(retryDelay): case <-teamProcessState.stopCh: return false } } // 每次尝试都从代理池获取新代理(避免复用失败代理) proxyToUse, _ := database.Instance.GetRandomCodexProxy() proxyDisplay := "直连" if proxyToUse != "" { proxyDisplay = getProxyDisplay(proxyToUse) } if attempt == 0 { logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team") } else { logger.Status(fmt.Sprintf("%s 入库重试中... | 代理: %s", memberLogPrefix, proxyDisplay), memberEmail, "team") } // 创建日志回调 - 只显示关键步骤 authLogger := auth.NewAuthLogger(memberEmail, logPrefix, memberIdx+1, func(entry auth.AuthLogEntry) { if entry.IsError { logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, entry.Message), memberEmail, "team") } else { // 只显示关键步骤:提交邮箱、验证密码、选择工作区、授权成功 switch entry.Step { case auth.StepInputEmail: logger.Info(fmt.Sprintf("%s 提交邮箱: %s", memberLogPrefix, memberEmail), memberEmail, "team") case auth.StepInputPassword: logger.Info(fmt.Sprintf("%s 验证密码...", memberLogPrefix), memberEmail, "team") case auth.StepSelectWorkspace: logger.Info(fmt.Sprintf("%s 选择工作区: %s", memberLogPrefix, teamID), memberEmail, "team") case auth.StepComplete: logger.Info(fmt.Sprintf("%s 授权成功,获取到授权码", memberLogPrefix), memberEmail, "team") } } }) // 获取授权 URL s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { lastError = fmt.Sprintf("获取授权URL失败: %v", err) logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team") continue } // 根据配置选择授权方式 var code string if config.Global.AuthMethod == "api" { code, err = auth.CompleteWithCodexAPI(memberEmail, memberPassword, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, proxyToUse, authLogger) if proxyToUse != "" { database.Instance.UpdateCodexProxyStats(proxyToUse, err == nil) } } else { code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, memberEmail, memberPassword, teamID, req.Headless, req.Proxy, authLogger) } if err != nil { lastError = fmt.Sprintf("浏览器授权失败: %v", err) logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team") // 跟踪 500 错误用于熔断 if strings.Contains(err.Error(), "500") || strings.Contains(err.Error(), "重试已耗尽") { atomic.AddInt32(&consecutive500Fails, 1) } continue } // 授权成功,重置 500 计数 atomic.StoreInt32(&consecutive500Fails, 0) // 提交到 S2A _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, "team-"+memberEmail, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { lastError = fmt.Sprintf("S2A提交失败: %v", err) logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team") continue } s2aSuccess = true memberDuration := time.Since(memberStartTime) logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", memberLogPrefix, memberDuration.Seconds()), memberEmail, "team") break } if s2aSuccess { atomic.AddInt32(&s2aSuccessCount, 1) } else { atomic.AddInt32(&s2aFailCount, 1) memberMu.Lock() result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败: %s", memberIdx+1, lastError)) memberMu.Unlock() } return s2aSuccess } // 注册并入库单个成员的函数(带1次重试)- 流水线模式 var s2aWg sync.WaitGroup registerAndS2AMember := func(memberIdx int, email, password string) bool { name := register.GenerateName() birthdate := register.GenerateBirthdate() memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1) regStartTime := time.Now() for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试) // 检查是否应该停止 if isStopped() || isTeamExhausted() { return false } currentEmail := email currentPassword := password if attempt > 0 { // 注册失败重试:保持原邮箱(邀请已发送),仅重新注册 logger.Warning(fmt.Sprintf("%s 注册重试 (第%d次), 保持邮箱: %s", memberLogPrefix, attempt+1, currentEmail), currentEmail, "team") } // 首次尝试时发送邀请,重试时跳过(邀请已发送到该邮箱) if attempt == 0 { if err := inviter.SendInvites([]string{currentEmail}); err != nil { errStr := err.Error() logger.Error(fmt.Sprintf("%s 邀请失败: %v", memberLogPrefix, err), currentEmail, "team") // 检测 Team 已达邀请上限(401 或 maximum number of seats) if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") { markTeamExhausted() return false } // 邀请失败时换新邮箱重试 email = mail.GenerateEmail() password = register.GeneratePassword() currentEmail = email currentPassword = password logger.Warning(fmt.Sprintf("%s 邀请失败,换新邮箱: %s", memberLogPrefix, currentEmail), currentEmail, "team") continue } } // 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记) if isTeamExhausted() { return false } // 注册 - 从代理池获取随机代理 regProxy := req.Proxy if database.Instance != nil { if poolProxy, poolErr := database.Instance.GetRandomCodexProxy(); poolErr == nil && poolProxy != "" { regProxy = poolProxy } } regProxyDisplay := "直连" if regProxy != "" { regProxyDisplay = getProxyDisplay(regProxy) } logger.Info(fmt.Sprintf("%s Email: %s | Password: %s | Proxy: %s", memberLogPrefix, currentEmail, currentPassword, regProxyDisplay), currentEmail, "team") _, err := register.APIRegister(currentEmail, currentPassword, name, birthdate, regProxy, memberLogPrefix) if err != nil { logger.Error(fmt.Sprintf("%s [注册失败] %v", memberLogPrefix, err), currentEmail, "team") continue } // 注册成功 regDuration := time.Since(regStartTime) memberMu.Lock() children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true} result.MemberEmails = append(result.MemberEmails, currentEmail) result.Registered++ memberMu.Unlock() logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team") // 流水线:注册成功后等待再开始入库(避免触发邮箱验证 + 成员间错开避免并发冲突) s2aWg.Add(1) go func(idx int, e, p string) { defer s2aWg.Done() // 基础 1 秒 + 成员索引 * 1 秒错开 + 0~1 秒随机抖动 stagger := 1*time.Second + time.Duration(idx)*time.Second + time.Duration(rand.Intn(1000))*time.Millisecond select { case <-time.After(stagger): case <-teamProcessState.stopCh: return } success := doS2A(idx, e, p) memberMu.Lock() children[idx].S2ADone = true children[idx].S2AOK = success memberMu.Unlock() }(memberIdx, currentEmail, currentPassword) return true } return false } // 第一轮:并发注册成员(注册成功后立即入库) logger.Info(fmt.Sprintf("%s ════════ 开始流水线处理 ════════ 目标: %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team") pipelineStartTime := time.Now() var regWg sync.WaitGroup for i := 0; i < req.MembersPerTeam; i++ { regWg.Add(1) go func(idx int) { defer regWg.Done() // 检查是否应该停止 if isStopped() || isTeamExhausted() { return } email := mail.GenerateEmail() password := register.GeneratePassword() registerAndS2AMember(idx, email, password) }(i) } if !waitGroupWithTimeout(®Wg, 8*time.Minute) { logger.Warning(fmt.Sprintf("%s 注册阶段超时 (8分钟),继续处理已完成的成员", logPrefix), owner.Email, "team") } // 如果 Team 已满,等待已启动的入库完成 if isTeamExhausted() { if !waitGroupWithTimeout(&s2aWg, 5*time.Minute) { logger.Warning(fmt.Sprintf("%s 入库等待超时 (5分钟)", logPrefix), owner.Email, "team") } result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount)) result.Errors = append(result.Errors, "Team 邀请已满") result.DurationMs = time.Since(startTime).Milliseconds() markOwnerResult(result.AddedToS2A > 0) return result } // 统计失败的成员 failedSlots := make([]int, 0) for i, c := range children { if !c.Success { failedSlots = append(failedSlots, i) } } // 第二轮:Team 有 4 次额外补救机会(如果 Team 未满) teamRetries := 4 for retry := 0; retry < teamRetries && len(failedSlots) > 0 && !isStopped() && !isTeamExhausted(); retry++ { slotIdx := failedSlots[0] logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") email := mail.GenerateEmail() password := register.GeneratePassword() if registerAndS2AMember(slotIdx, email, password) { failedSlots = failedSlots[1:] // 成功,移除这个槽位 } } // 等待所有入库完成 if !waitGroupWithTimeout(&s2aWg, 10*time.Minute) { logger.Warning(fmt.Sprintf("%s 入库阶段超时 (10分钟),继续统计结果", logPrefix), owner.Email, "team") } // 补救后再次检查 Team 是否已满 if isTeamExhausted() { result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount)) result.Errors = append(result.Errors, "Team 邀请已满") result.DurationMs = time.Since(startTime).Milliseconds() markOwnerResult(result.AddedToS2A > 0) return result } // 统计最终结果 if len(failedSlots) > 0 { result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots))) } result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount)) pipelineDuration := time.Since(pipelineStartTime) logger.Info(fmt.Sprintf("%s ════════ 流水线完成 ════════ 注册: %d/%d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, req.MembersPerTeam, result.AddedToS2A, pipelineDuration.Seconds()), owner.Email, "team") // 如果没有任何成员注册成功,跳过母号入库 if result.Registered == 0 { logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过母号入库", logPrefix), owner.Email, "team") result.DurationMs = time.Since(startTime).Milliseconds() markOwnerResult(false) return result } // Step 4: 母号也入库(如果开启)- 带重试 if req.IncludeOwner && !isStopped() { ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix) ownerStartTime := time.Now() logger.Status(fmt.Sprintf("%s 母号入库中...", ownerLogPrefix), owner.Email, "team") var ownerSuccess bool var lastError string for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 // 检查停止信号 if isStopped() { break } if attempt > 0 { logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team") } // 创建日志回调(输出关键日志和调试信息) authLogger := auth.NewAuthLogger(owner.Email, logPrefix, 0, func(entry auth.AuthLogEntry) { if entry.IsError { logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team") } else { switch entry.Step { case auth.StepNavigate, auth.StepInputEmail, auth.StepInputPassword, auth.StepComplete, auth.StepConsent, auth.StepSelectWorkspace: logger.Info(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team") } } }) s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { lastError = fmt.Sprintf("获取授权URL失败: %v", err) logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team") continue } var code string // 根据全局配置决定授权方式 if config.Global.AuthMethod == "api" { // 使用纯 API 模式(CodexAuth)- 使用 S2A 生成的授权 URL // 从代理池随机选择代理 proxyToUse := req.Proxy if poolProxy, poolErr := database.Instance.GetRandomCodexProxy(); poolErr == nil && poolProxy != "" { proxyToUse = poolProxy logger.Info(fmt.Sprintf("%s 使用代理池: %s", ownerLogPrefix, getProxyDisplay(poolProxy)), owner.Email, "team") } code, err = auth.CompleteWithCodexAPI(owner.Email, owner.Password, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, proxyToUse, authLogger) // 更新代理统计 if proxyToUse != req.Proxy && proxyToUse != "" { database.Instance.UpdateCodexProxyStats(proxyToUse, err == nil) } } else { // 使用 Chromedp 浏览器自动化 code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy, authLogger) } if err != nil { lastError = fmt.Sprintf("浏览器授权失败: %v", err) logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team") continue } // 提交到 S2A _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, "team-"+owner.Email, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { lastError = fmt.Sprintf("S2A提交失败: %v", err) logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team") continue } ownerSuccess = true result.AddedToS2A++ ownerDuration := time.Since(ownerStartTime) logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", ownerLogPrefix, ownerDuration.Seconds()), owner.Email, "team") break } if !ownerSuccess { result.Errors = append(result.Errors, fmt.Sprintf("母号入库失败: %s", lastError)) } } result.DurationMs = time.Since(startTime).Milliseconds() logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team") // 根据入库结果标记 owner 状态 // 只要有任何一个账号入库成功,就标记为已使用 markOwnerResult(result.AddedToS2A > 0) return result }