package api import ( "encoding/json" "fmt" "net/http" "strings" "sync" "sync/atomic" "time" "codex-pool/internal/auth" "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/invite" "codex-pool/internal/logger" "codex-pool/internal/mail" "codex-pool/internal/register" ) // TeamOwner 团队母号信息 type TeamOwner struct { Email string `json:"email"` Password string `json:"password"` Token string `json:"token"` AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用 } // TeamProcessRequest 团队处理请求 type TeamProcessRequest struct { // Owner 账号列表 Owners []TeamOwner `json:"owners"` // 配置 MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数 ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量 BrowserType string `json:"browser_type"` // "chromedp" 或 "rod" Headless bool `json:"headless"` // 是否无头模式 Proxy string `json:"proxy"` // 代理设置 IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A ProcessCount int `json:"process_count"` // 处理数量,0表示全部 } // TeamProcessResult 团队处理结果 type TeamProcessResult struct { TeamIndex int `json:"team_index"` OwnerEmail string `json:"owner_email"` TeamID string `json:"team_id"` Registered int `json:"registered"` AddedToS2A int `json:"added_to_s2a"` MemberEmails []string `json:"member_emails"` Errors []string `json:"errors"` DurationMs int64 `json:"duration_ms"` } // TeamProcessState 处理状态 type TeamProcessState struct { Running bool `json:"running"` StartedAt time.Time `json:"started_at"` TotalTeams int `json:"total_teams"` Completed int32 `json:"completed"` Results []TeamProcessResult `json:"results"` mu sync.Mutex } var teamProcessState = &TeamProcessState{} // HandleTeamProcess POST /api/team/process - 启动 Team 批量处理 func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } // 检查是否正在运行 if teamProcessState.Running { Error(w, http.StatusConflict, "已有任务正在运行") return } var req TeamProcessRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { Error(w, http.StatusBadRequest, "请求格式错误") return } // 如果没有传入 owners,从数据库获取待处理的母号 if len(req.Owners) == 0 { pendingOwners, err := database.Instance.GetPendingOwners() if err != nil { Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err)) return } if len(pendingOwners) == 0 { Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件") return } // 转换为请求格式(包含已存储的 account_id) for _, o := range pendingOwners { req.Owners = append(req.Owners, TeamOwner{ Email: o.Email, Password: o.Password, Token: o.Token, AccountID: o.AccountID, // 直接使用数据库中存储的 account_id }) } logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team") } // 根据 ProcessCount 限制处理数量 if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) { req.Owners = req.Owners[:req.ProcessCount] logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team") } if req.MembersPerTeam <= 0 { req.MembersPerTeam = 4 } if req.ConcurrentTeams <= 0 { req.ConcurrentTeams = len(req.Owners) } if req.ConcurrentTeams > len(req.Owners) { req.ConcurrentTeams = len(req.Owners) } if req.BrowserType == "" { req.BrowserType = "chromedp" // 默认使用 Chromedp } if req.Proxy == "" && config.Global != nil { req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法 } // 初始化状态 teamProcessState.Running = true teamProcessState.StartedAt = time.Now() teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理 teamProcessState.Completed = 0 teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners)) // 异步执行 go runTeamProcess(req) Success(w, map[string]interface{}{ "message": "任务已启动", "total_teams": len(req.Owners), "concurrent_teams": req.ConcurrentTeams, "started_at": teamProcessState.StartedAt, }) } // HandleTeamProcessStatus GET /api/team/status - 获取处理状态 func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { Error(w, http.StatusMethodNotAllowed, "仅支持 GET") return } teamProcessState.mu.Lock() defer teamProcessState.mu.Unlock() Success(w, map[string]interface{}{ "running": teamProcessState.Running, "started_at": teamProcessState.StartedAt, "total_teams": teamProcessState.TotalTeams, "completed": teamProcessState.Completed, "results": teamProcessState.Results, "elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(), }) } // HandleTeamProcessStop POST /api/team/stop - 停止处理 func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } teamProcessState.Running = false Success(w, map[string]string{"message": "已发送停止信号"}) } // runTeamProcess 执行 Team 批量处理 - 使用工作池模式 func runTeamProcess(req TeamProcessRequest) { totalOwners := len(req.Owners) workerCount := req.ConcurrentTeams // 同时运行的 worker 数量 if workerCount > totalOwners { workerCount = totalOwners } if workerCount <= 0 { workerCount = 2 // 默认 2 个并发 } // 创建批次记录 var batchID int64 if database.Instance != nil { var err error batchID, err = database.Instance.CreateBatchRun(totalOwners) if err != nil { logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team") } } // 统计变量(在 defer 中使用) var totalRegistered, totalAddedToS2A int var allErrors []string // 确保任务结束时更新状态和批次记录 defer func() { teamProcessState.Running = false // 无论任务是正常完成还是异常中断,都更新批次记录状态 if database.Instance != nil && batchID > 0 { errorsStr := "" if len(allErrors) > 0 { // 只保留前10条错误 if len(allErrors) > 10 { allErrors = allErrors[:10] } errorsStr = fmt.Sprintf("%v", allErrors) } if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil { logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team") } } }() logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team") // 任务队列 taskChan := make(chan int, totalOwners) resultChan := make(chan TeamProcessResult, totalOwners) var wg sync.WaitGroup // 启动 worker for w := 0; w < workerCount; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for idx := range taskChan { if !teamProcessState.Running { return } result := processSingleTeam(idx, req) resultChan <- result atomic.AddInt32(&teamProcessState.Completed, 1) } }(w) } // 发送任务 go func() { for i := 0; i < totalOwners; i++ { if !teamProcessState.Running { break } taskChan <- i } close(taskChan) }() // 等待完成并收集结果 go func() { wg.Wait() close(resultChan) }() // 收集结果并统计 for result := range resultChan { teamProcessState.mu.Lock() teamProcessState.Results = append(teamProcessState.Results, result) teamProcessState.mu.Unlock() totalRegistered += result.Registered totalAddedToS2A += result.AddedToS2A allErrors = append(allErrors, result.Errors...) } // 计算成功率 expectedTotal := totalOwners * req.MembersPerTeam successRate := float64(0) if expectedTotal > 0 { successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100 } logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%", teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team") } // processSingleTeam 处理单个 Team func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { startTime := time.Now() owner := req.Owners[idx] result := TeamProcessResult{ TeamIndex: idx + 1, OwnerEmail: owner.Email, MemberEmails: make([]string, 0), Errors: make([]string, 0), } // 固定宽度的 Team 编号 (支持到 Team 99) logPrefix := fmt.Sprintf("[Team %2d]", idx+1) logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team") // 标记 owner 为处理中 if database.Instance != nil { database.Instance.MarkOwnerAsProcessing(owner.Email) } // 处理失败时的清理函数 markOwnerResult := func(success bool) { if database.Instance != nil { if success { database.Instance.MarkOwnerAsUsed(owner.Email) } else { // 失败时恢复为 valid,允许重试 database.Instance.MarkOwnerAsFailed(owner.Email) } } } // Step 1: 获取 Team ID(优先使用已存储的 account_id) var teamID string inviter := invite.NewWithProxy(owner.Token, req.Proxy) if owner.AccountID != "" { // 直接使用数据库中存储的 account_id teamID = owner.AccountID inviter.SetAccountID(teamID) // 必须设置到 inviter 中 logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team") } else { // 如果没有存储,才请求 API 获取 var err error teamID, err = inviter.GetAccountID() if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team") markOwnerResult(false) return result } logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team") } result.TeamID = teamID // Step 2: 测试邀请功能(检测 Team 是否被封禁) testEmail := mail.GenerateEmail() if err := inviter.SendInvites([]string{testEmail}); err != nil { // 邀请失败,可能是 Team 被封禁 errStr := err.Error() if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") || strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") || strings.Contains(errStr, "deactivated") { // Team 被封禁,标记为 invalid logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) } result.Errors = append(result.Errors, "Team 被封禁") result.DurationMs = time.Since(startTime).Milliseconds() return result } // 其他邀请错误,继续尝试 logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team") } // Step 3: 并发注册成员 // 每个成员:邀请 → 注册,失败重试1次 // Team 有4次额外补救机会 type MemberAccount struct { Email string Password string Success bool } children := make([]MemberAccount, req.MembersPerTeam) var memberMu sync.Mutex var memberWg sync.WaitGroup // 注册单个成员的函数(带1次重试) registerMember := func(memberIdx int, email, password string) bool { name := register.GenerateName() birthdate := register.GenerateBirthdate() for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试) if !teamProcessState.Running { return false } currentEmail := email currentPassword := password if attempt > 0 { // 重试时使用新邮箱 currentEmail = mail.GenerateEmail() currentPassword = register.GeneratePassword() logger.Warning(fmt.Sprintf("%s [成员 %d] 重试, 新邮箱: %s", logPrefix, memberIdx+1, currentEmail), currentEmail, "team") } // 发送邀请 if err := inviter.SendInvites([]string{currentEmail}); err != nil { errStr := err.Error() logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team") // 检测 Team 已达邀请上限 if strings.Contains(errStr, "maximum number of seats") { logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用", logPrefix), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsUsed(owner.Email) } // 跳出重试,该成员不再处理 return false } continue } // 注册 _, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy) if err != nil { logger.Error(fmt.Sprintf("%s [成员 %d] 注册失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team") continue } // 成功 memberMu.Lock() children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true} memberMu.Unlock() logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 注册成功", logPrefix, memberIdx+1), currentEmail, "team") return true } return false } // 第一轮:并发注册4个成员 logger.Info(fmt.Sprintf("%s 开始并发注册 %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team") for i := 0; i < req.MembersPerTeam; i++ { memberWg.Add(1) go func(idx int) { defer memberWg.Done() email := mail.GenerateEmail() password := register.GeneratePassword() logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team") registerMember(idx, email, password) }(i) } memberWg.Wait() // 统计失败的成员 failedSlots := make([]int, 0) for i, c := range children { if !c.Success { failedSlots = append(failedSlots, i) } } // 第二轮:Team 有 4 次额外补救机会 teamRetries := 4 for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ { slotIdx := failedSlots[0] logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") email := mail.GenerateEmail() password := register.GeneratePassword() if registerMember(slotIdx, email, password) { failedSlots = failedSlots[1:] // 成功,移除这个槽位 } } // 统计注册成功数 registeredChildren := make([]MemberAccount, 0) for _, c := range children { if c.Success { registeredChildren = append(registeredChildren, c) result.MemberEmails = append(result.MemberEmails, c.Email) result.Registered++ } } if len(failedSlots) > 0 { result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots))) } logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team") // Step 4: S2A 授权入库(成员) for i, child := range registeredChildren { if !teamProcessState.Running { break } s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d auth URL: %v", i+1, err)) continue } // 根据配置选择浏览器自动化 var code string if req.BrowserType == "rod" { code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) } else { code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) } if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d browser: %v", i+1, err)) continue } // 提交到 S2A _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, child.Email, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d S2A: %v", i+1, err)) continue } result.AddedToS2A++ logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team") } // Step 5: 母号也入库(如果开启) if req.IncludeOwner && teamProcessState.Running { logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team") s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner auth URL: %v", err)) } else { var code string if req.BrowserType == "rod" { code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) } else { code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) } if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner browser: %v", err)) } else { _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, owner.Email, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err)) } else { result.AddedToS2A++ logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team") } } } } result.DurationMs = time.Since(startTime).Milliseconds() logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team") // 根据入库结果标记 owner 状态 // 只要有任何一个账号入库成功,就标记为已使用 markOwnerResult(result.AddedToS2A > 0) return result } // registerWithTimeout 带超时的注册 func registerWithTimeout(email, password, name, birthdate, proxy string) (*register.ChatGPTReg, error) { reg, err := register.New(proxy) if err != nil { return nil, err } if err := reg.InitSession(); err != nil { return nil, fmt.Errorf("初始化失败: %v", err) } if err := reg.GetAuthorizeURL(email); err != nil { return nil, fmt.Errorf("获取授权URL失败: %v", err) } if err := reg.StartAuthorize(); err != nil { return nil, fmt.Errorf("启动授权失败: %v", err) } if err := reg.Register(email, password); err != nil { return nil, fmt.Errorf("注册失败: %v", err) } if err := reg.SendVerificationEmail(); err != nil { return nil, fmt.Errorf("发送邮件失败: %v", err) } // 短超时获取验证码 otpCode, err := mail.GetVerificationCode(email, 5*time.Second) if err != nil { otpCode, err = mail.GetVerificationCode(email, 15*time.Second) if err != nil { return nil, fmt.Errorf("验证码获取超时") } } if err := reg.ValidateOTP(otpCode); err != nil { return nil, fmt.Errorf("OTP验证失败: %v", err) } if err := reg.CreateAccount(name, birthdate); err != nil { return nil, fmt.Errorf("创建账户失败: %v", err) } _ = reg.GetSessionToken() return reg, nil }