package api import ( "encoding/json" "fmt" "net/http" "strings" "sync" "sync/atomic" "time" "codex-pool/internal/auth" "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/invite" "codex-pool/internal/logger" "codex-pool/internal/mail" "codex-pool/internal/register" ) // TeamProcessRequest 团队处理请求 type TeamProcessRequest struct { // Owner 账号列表 Owners []struct { Email string `json:"email"` Password string `json:"password"` Token string `json:"token"` AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用 } `json:"owners"` // 配置 MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数 ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量 BrowserType string `json:"browser_type"` // "chromedp" 或 "rod" Headless bool `json:"headless"` // 是否无头模式 Proxy string `json:"proxy"` // 代理设置 IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A } // TeamProcessResult 团队处理结果 type TeamProcessResult struct { TeamIndex int `json:"team_index"` OwnerEmail string `json:"owner_email"` TeamID string `json:"team_id"` Registered int `json:"registered"` AddedToS2A int `json:"added_to_s2a"` MemberEmails []string `json:"member_emails"` Errors []string `json:"errors"` DurationMs int64 `json:"duration_ms"` } // TeamProcessState 处理状态 type TeamProcessState struct { Running bool `json:"running"` StartedAt time.Time `json:"started_at"` TotalTeams int `json:"total_teams"` Completed int32 `json:"completed"` Results []TeamProcessResult `json:"results"` mu sync.Mutex } var teamProcessState = &TeamProcessState{} // HandleTeamProcess POST /api/team/process - 启动 Team 批量处理 func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } // 检查是否正在运行 if teamProcessState.Running { Error(w, http.StatusConflict, "已有任务正在运行") return } var req TeamProcessRequest if err := json.NewDecoder(r.Body).Decode(&req); err != nil { Error(w, http.StatusBadRequest, "请求格式错误") return } // 如果没有传入 owners,从数据库获取待处理的母号 if len(req.Owners) == 0 { pendingOwners, err := database.Instance.GetPendingOwners() if err != nil { Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err)) return } if len(pendingOwners) == 0 { Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件") return } // 转换为请求格式(包含已存储的 account_id) for _, o := range pendingOwners { req.Owners = append(req.Owners, struct { Email string `json:"email"` Password string `json:"password"` Token string `json:"token"` AccountID string `json:"account_id"` }{ Email: o.Email, Password: o.Password, Token: o.Token, AccountID: o.AccountID, // 直接使用数据库中存储的 account_id }) } logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team") } if req.MembersPerTeam <= 0 { req.MembersPerTeam = 4 } if req.ConcurrentTeams <= 0 { req.ConcurrentTeams = len(req.Owners) } if req.ConcurrentTeams > len(req.Owners) { req.ConcurrentTeams = len(req.Owners) } if req.BrowserType == "" { req.BrowserType = "chromedp" // 默认使用 Chromedp } if req.Proxy == "" && config.Global != nil { req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法 } // 初始化状态 teamProcessState.Running = true teamProcessState.StartedAt = time.Now() teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理 teamProcessState.Completed = 0 teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners)) // 异步执行 go runTeamProcess(req) Success(w, map[string]interface{}{ "message": "任务已启动", "total_teams": len(req.Owners), "concurrent_teams": req.ConcurrentTeams, "started_at": teamProcessState.StartedAt, }) } // HandleTeamProcessStatus GET /api/team/status - 获取处理状态 func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { Error(w, http.StatusMethodNotAllowed, "仅支持 GET") return } teamProcessState.mu.Lock() defer teamProcessState.mu.Unlock() Success(w, map[string]interface{}{ "running": teamProcessState.Running, "started_at": teamProcessState.StartedAt, "total_teams": teamProcessState.TotalTeams, "completed": teamProcessState.Completed, "results": teamProcessState.Results, "elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(), }) } // HandleTeamProcessStop POST /api/team/stop - 停止处理 func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } teamProcessState.Running = false Success(w, map[string]string{"message": "已发送停止信号"}) } // runTeamProcess 执行 Team 批量处理 - 使用工作池模式 func runTeamProcess(req TeamProcessRequest) { defer func() { teamProcessState.Running = false }() totalOwners := len(req.Owners) workerCount := req.ConcurrentTeams // 同时运行的 worker 数量 if workerCount > totalOwners { workerCount = totalOwners } if workerCount <= 0 { workerCount = 2 // 默认 2 个并发 } logger.Info(fmt.Sprintf("Starting Team process: %d owners, %d concurrent workers", totalOwners, workerCount), "", "team") // 任务队列 taskChan := make(chan int, totalOwners) resultChan := make(chan TeamProcessResult, totalOwners) var wg sync.WaitGroup // 启动 worker for w := 0; w < workerCount; w++ { wg.Add(1) go func(workerID int) { defer wg.Done() for idx := range taskChan { if !teamProcessState.Running { return } result := processSingleTeam(idx, req) resultChan <- result atomic.AddInt32(&teamProcessState.Completed, 1) } }(w) } // 发送任务 go func() { for i := 0; i < totalOwners; i++ { if !teamProcessState.Running { break } taskChan <- i } close(taskChan) }() // 等待完成并收集结果 go func() { wg.Wait() close(resultChan) }() for result := range resultChan { teamProcessState.mu.Lock() teamProcessState.Results = append(teamProcessState.Results, result) teamProcessState.mu.Unlock() } logger.Success(fmt.Sprintf("Team process complete: %d/%d teams processed", teamProcessState.Completed, totalOwners), "", "team") } // processSingleTeam 处理单个 Team func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { startTime := time.Now() owner := req.Owners[idx] result := TeamProcessResult{ TeamIndex: idx + 1, OwnerEmail: owner.Email, MemberEmails: make([]string, 0), Errors: make([]string, 0), } logPrefix := fmt.Sprintf("[Team %d]", idx+1) logger.Info(fmt.Sprintf("%s Starting with owner: %s", logPrefix, owner.Email), owner.Email, "team") // Step 1: 获取 Team ID(优先使用已存储的 account_id) var teamID string inviter := invite.NewWithProxy(owner.Token, req.Proxy) if owner.AccountID != "" { // 直接使用数据库中存储的 account_id teamID = owner.AccountID inviter.SetAccountID(teamID) // 必须设置到 inviter 中 logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team") } else { // 如果没有存储,才请求 API 获取 var err error teamID, err = inviter.GetAccountID() if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team") return result } logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team") } result.TeamID = teamID // Step 2: 生成成员邮箱并发送邀请 type MemberAccount struct { Email string Password string Success bool } children := make([]MemberAccount, req.MembersPerTeam) for i := 0; i < req.MembersPerTeam; i++ { children[i].Email = mail.GenerateEmail() children[i].Password = register.GeneratePassword() logger.Info(fmt.Sprintf("%s [Member %d] Email: %s", logPrefix, i+1, children[i].Email), children[i].Email, "team") } // 发送邀请 inviteEmails := make([]string, req.MembersPerTeam) for i, c := range children { inviteEmails[i] = c.Email } if err := inviter.SendInvites(inviteEmails); err != nil { result.Errors = append(result.Errors, fmt.Sprintf("发送邀请失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() return result } logger.Success(fmt.Sprintf("%s Sent %d invite(s)", logPrefix, len(inviteEmails)), owner.Email, "team") // Step 3: 并发注册成员 var memberWg sync.WaitGroup memberMutex := sync.Mutex{} for i := range children { memberWg.Add(1) go func(memberIdx int) { defer memberWg.Done() memberMutex.Lock() email := children[memberIdx].Email password := children[memberIdx].Password memberMutex.Unlock() name := register.GenerateName() birthdate := register.GenerateBirthdate() // 重试逻辑 for attempt := 0; attempt < 3; attempt++ { if !teamProcessState.Running { return } if attempt > 0 { email = mail.GenerateEmail() password = register.GeneratePassword() logger.Info(fmt.Sprintf("%s [Member %d] Retry %d: %s", logPrefix, memberIdx+1, attempt, email), email, "team") if err := inviter.SendInvites([]string{email}); err != nil { continue } } _, err := registerWithTimeout(email, password, name, birthdate, req.Proxy) if err != nil { if strings.Contains(err.Error(), "验证码") { continue } result.Errors = append(result.Errors, fmt.Sprintf("Member %d: %v", memberIdx+1, err)) return } memberMutex.Lock() children[memberIdx].Email = email children[memberIdx].Password = password children[memberIdx].Success = true memberMutex.Unlock() logger.Success(fmt.Sprintf("%s [Member %d] Registered", logPrefix, memberIdx+1), email, "team") return } }(i) } memberWg.Wait() // 统计注册成功数 registeredChildren := make([]MemberAccount, 0) for _, c := range children { if c.Success { registeredChildren = append(registeredChildren, c) result.MemberEmails = append(result.MemberEmails, c.Email) result.Registered++ } } logger.Info(fmt.Sprintf("%s Registered: %d/%d", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team") // Step 4: S2A 授权入库(成员) for i, child := range registeredChildren { if !teamProcessState.Running { break } s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d auth URL: %v", i+1, err)) continue } // 根据配置选择浏览器自动化 var code string if req.BrowserType == "rod" { code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) } else { code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) } if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d browser: %v", i+1, err)) continue } // 提交到 S2A _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, child.Email, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Member %d S2A: %v", i+1, err)) continue } result.AddedToS2A++ logger.Success(fmt.Sprintf("%s [Member %d] Added to S2A", logPrefix, i+1), child.Email, "team") } // Step 5: 母号也入库(如果开启) if req.IncludeOwner && teamProcessState.Running { logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team") s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner auth URL: %v", err)) } else { var code string if req.BrowserType == "rod" { code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) } else { code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) } if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner browser: %v", err)) } else { _, err = auth.SubmitS2AOAuth( config.Global.S2AApiBase, config.Global.S2AAdminKey, s2aResp.Data.SessionID, code, owner.Email, config.Global.Concurrency, config.Global.Priority, config.Global.GroupIDs, config.Global.ProxyID, ) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err)) } else { result.AddedToS2A++ logger.Success(fmt.Sprintf("%s [Owner] Added to S2A", logPrefix), owner.Email, "team") } } } } result.DurationMs = time.Since(startTime).Milliseconds() logger.Success(fmt.Sprintf("%s Complete: %d registered, %d in S2A", logPrefix, result.Registered, result.AddedToS2A), owner.Email, "team") return result } // registerWithTimeout 带超时的注册 func registerWithTimeout(email, password, name, birthdate, proxy string) (*register.ChatGPTReg, error) { reg, err := register.New(proxy) if err != nil { return nil, err } if err := reg.InitSession(); err != nil { return nil, fmt.Errorf("初始化失败: %v", err) } if err := reg.GetAuthorizeURL(email); err != nil { return nil, fmt.Errorf("获取授权URL失败: %v", err) } if err := reg.StartAuthorize(); err != nil { return nil, fmt.Errorf("启动授权失败: %v", err) } if err := reg.Register(email, password); err != nil { return nil, fmt.Errorf("注册失败: %v", err) } if err := reg.SendVerificationEmail(); err != nil { return nil, fmt.Errorf("发送邮件失败: %v", err) } // 短超时获取验证码 otpCode, err := mail.GetVerificationCode(email, 5*time.Second) if err != nil { otpCode, err = mail.GetVerificationCode(email, 15*time.Second) if err != nil { return nil, fmt.Errorf("验证码获取超时") } } if err := reg.ValidateOTP(otpCode); err != nil { return nil, fmt.Errorf("OTP验证失败: %v", err) } if err := reg.CreateAccount(name, birthdate); err != nil { return nil, fmt.Errorf("创建账户失败: %v", err) } _ = reg.GetSessionToken() return reg, nil }