Files
codexautopool/backend/internal/api/team_process.go

611 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"codex-pool/internal/auth"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/invite"
"codex-pool/internal/logger"
"codex-pool/internal/mail"
"codex-pool/internal/register"
)
// TeamOwner 团队母号信息
type TeamOwner struct {
Email string `json:"email"`
Password string `json:"password"`
Token string `json:"token"`
AccountID string `json:"account_id"` // 已存储的 account_id如有则直接使用
}
// TeamProcessRequest 团队处理请求
type TeamProcessRequest struct {
// Owner 账号列表
Owners []TeamOwner `json:"owners"`
// 配置
MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数
ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量
BrowserType string `json:"browser_type"` // "chromedp" 或 "rod"
Headless bool `json:"headless"` // 是否无头模式
Proxy string `json:"proxy"` // 代理设置
IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A
ProcessCount int `json:"process_count"` // 处理数量0表示全部
}
// TeamProcessResult 团队处理结果
type TeamProcessResult struct {
TeamIndex int `json:"team_index"`
OwnerEmail string `json:"owner_email"`
TeamID string `json:"team_id"`
Registered int `json:"registered"`
AddedToS2A int `json:"added_to_s2a"`
MemberEmails []string `json:"member_emails"`
Errors []string `json:"errors"`
DurationMs int64 `json:"duration_ms"`
}
// TeamProcessState 处理状态
type TeamProcessState struct {
Running bool `json:"running"`
StartedAt time.Time `json:"started_at"`
TotalTeams int `json:"total_teams"`
Completed int32 `json:"completed"`
Results []TeamProcessResult `json:"results"`
mu sync.Mutex
}
var teamProcessState = &TeamProcessState{}
// HandleTeamProcess POST /api/team/process - 启动 Team 批量处理
func HandleTeamProcess(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
// 检查是否正在运行
if teamProcessState.Running {
Error(w, http.StatusConflict, "已有任务正在运行")
return
}
var req TeamProcessRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
Error(w, http.StatusBadRequest, "请求格式错误")
return
}
// 如果没有传入 owners从数据库获取待处理的母号
if len(req.Owners) == 0 {
pendingOwners, err := database.Instance.GetPendingOwners()
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err))
return
}
if len(pendingOwners) == 0 {
Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件")
return
}
// 转换为请求格式(包含已存储的 account_id
for _, o := range pendingOwners {
req.Owners = append(req.Owners, TeamOwner{
Email: o.Email,
Password: o.Password,
Token: o.Token,
AccountID: o.AccountID, // 直接使用数据库中存储的 account_id
})
}
logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team")
}
// 根据 ProcessCount 限制处理数量
if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) {
req.Owners = req.Owners[:req.ProcessCount]
logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team")
}
if req.MembersPerTeam <= 0 {
req.MembersPerTeam = 4
}
if req.ConcurrentTeams <= 0 {
req.ConcurrentTeams = len(req.Owners)
}
if req.ConcurrentTeams > len(req.Owners) {
req.ConcurrentTeams = len(req.Owners)
}
if req.BrowserType == "" {
req.BrowserType = "chromedp" // 默认使用 Chromedp
}
if req.Proxy == "" && config.Global != nil {
req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法
}
// 初始化状态
teamProcessState.Running = true
teamProcessState.StartedAt = time.Now()
teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理
teamProcessState.Completed = 0
teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners))
// 异步执行
go runTeamProcess(req)
Success(w, map[string]interface{}{
"message": "任务已启动",
"total_teams": len(req.Owners),
"concurrent_teams": req.ConcurrentTeams,
"started_at": teamProcessState.StartedAt,
})
}
// HandleTeamProcessStatus GET /api/team/status - 获取处理状态
func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
teamProcessState.mu.Lock()
defer teamProcessState.mu.Unlock()
Success(w, map[string]interface{}{
"running": teamProcessState.Running,
"started_at": teamProcessState.StartedAt,
"total_teams": teamProcessState.TotalTeams,
"completed": teamProcessState.Completed,
"results": teamProcessState.Results,
"elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(),
})
}
// HandleTeamProcessStop POST /api/team/stop - 停止处理
func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
teamProcessState.Running = false
Success(w, map[string]string{"message": "已发送停止信号"})
}
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
func runTeamProcess(req TeamProcessRequest) {
defer func() {
teamProcessState.Running = false
}()
totalOwners := len(req.Owners)
workerCount := req.ConcurrentTeams // 同时运行的 worker 数量
if workerCount > totalOwners {
workerCount = totalOwners
}
if workerCount <= 0 {
workerCount = 2 // 默认 2 个并发
}
// 创建批次记录
var batchID int64
if database.Instance != nil {
var err error
batchID, err = database.Instance.CreateBatchRun(totalOwners)
if err != nil {
logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team")
}
}
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
// 任务队列
taskChan := make(chan int, totalOwners)
resultChan := make(chan TeamProcessResult, totalOwners)
var wg sync.WaitGroup
// 启动 worker
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for idx := range taskChan {
if !teamProcessState.Running {
return
}
result := processSingleTeam(idx, req)
resultChan <- result
atomic.AddInt32(&teamProcessState.Completed, 1)
}
}(w)
}
// 发送任务
go func() {
for i := 0; i < totalOwners; i++ {
if !teamProcessState.Running {
break
}
taskChan <- i
}
close(taskChan)
}()
// 等待完成并收集结果
go func() {
wg.Wait()
close(resultChan)
}()
// 统计总数
var totalRegistered, totalAddedToS2A int
var allErrors []string
for result := range resultChan {
teamProcessState.mu.Lock()
teamProcessState.Results = append(teamProcessState.Results, result)
teamProcessState.mu.Unlock()
totalRegistered += result.Registered
totalAddedToS2A += result.AddedToS2A
allErrors = append(allErrors, result.Errors...)
}
// 更新批次记录
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr)
}
// 计算成功率
expectedTotal := totalOwners * req.MembersPerTeam
successRate := float64(0)
if expectedTotal > 0 {
successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100
}
logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%",
teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team")
}
// processSingleTeam 处理单个 Team
func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
startTime := time.Now()
owner := req.Owners[idx]
result := TeamProcessResult{
TeamIndex: idx + 1,
OwnerEmail: owner.Email,
MemberEmails: make([]string, 0),
Errors: make([]string, 0),
}
// 固定宽度的 Team 编号 (支持到 Team 99)
logPrefix := fmt.Sprintf("[Team %2d]", idx+1)
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
// 标记 owner 为处理中
if database.Instance != nil {
database.Instance.MarkOwnerAsProcessing(owner.Email)
}
// 处理失败时的清理函数
markOwnerResult := func(success bool) {
if database.Instance != nil {
if success {
database.Instance.MarkOwnerAsUsed(owner.Email)
} else {
// 失败时恢复为 valid允许重试
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}
// Step 1: 获取 Team ID优先使用已存储的 account_id
var teamID string
inviter := invite.NewWithProxy(owner.Token, req.Proxy)
if owner.AccountID != "" {
// 直接使用数据库中存储的 account_id
teamID = owner.AccountID
inviter.SetAccountID(teamID) // 必须设置到 inviter 中
logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team")
} else {
// 如果没有存储,才请求 API 获取
var err error
teamID, err = inviter.GetAccountID()
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
markOwnerResult(false)
return result
}
logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team")
}
result.TeamID = teamID
// Step 2: 测试邀请功能(检测 Team 是否被封禁)
testEmail := mail.GenerateEmail()
if err := inviter.SendInvites([]string{testEmail}); err != nil {
// 邀请失败,可能是 Team 被封禁
errStr := err.Error()
if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") ||
strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") ||
strings.Contains(errStr, "deactivated") {
// Team 被封禁,标记为 invalid
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
}
result.Errors = append(result.Errors, "Team 被封禁")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 其他邀请错误,继续尝试
logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team")
}
// Step 3: 并发注册成员
// 每个成员:邀请 → 注册失败重试1次
// Team 有4次额外补救机会
type MemberAccount struct {
Email string
Password string
Success bool
}
children := make([]MemberAccount, req.MembersPerTeam)
var memberMu sync.Mutex
var memberWg sync.WaitGroup
// 注册单个成员的函数带1次重试
registerMember := func(memberIdx int, email, password string) bool {
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
if !teamProcessState.Running {
return false
}
currentEmail := email
currentPassword := password
if attempt > 0 {
// 重试时使用新邮箱
currentEmail = mail.GenerateEmail()
currentPassword = register.GeneratePassword()
logger.Warning(fmt.Sprintf("%s [成员 %d] 重试, 新邮箱: %s", logPrefix, memberIdx+1, currentEmail), currentEmail, "team")
}
// 发送邀请
if err := inviter.SendInvites([]string{currentEmail}); err != nil {
errStr := err.Error()
logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
// 检测 Team 已达邀请上限
if strings.Contains(errStr, "maximum number of seats") {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
}
// 跳出重试,该成员不再处理
return false
}
continue
}
// 注册
_, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 注册失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
continue
}
// 成功
memberMu.Lock()
children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true}
memberMu.Unlock()
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 注册成功", logPrefix, memberIdx+1), currentEmail, "team")
return true
}
return false
}
// 第一轮并发注册4个成员
logger.Info(fmt.Sprintf("%s 开始并发注册 %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team")
for i := 0; i < req.MembersPerTeam; i++ {
memberWg.Add(1)
go func(idx int) {
defer memberWg.Done()
email := mail.GenerateEmail()
password := register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team")
registerMember(idx, email, password)
}(i)
}
memberWg.Wait()
// 统计失败的成员
failedSlots := make([]int, 0)
for i, c := range children {
if !c.Success {
failedSlots = append(failedSlots, i)
}
}
// 第二轮Team 有 4 次额外补救机会
teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ {
slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
email := mail.GenerateEmail()
password := register.GeneratePassword()
if registerMember(slotIdx, email, password) {
failedSlots = failedSlots[1:] // 成功,移除这个槽位
}
}
// 统计注册成功数
registeredChildren := make([]MemberAccount, 0)
for _, c := range children {
if c.Success {
registeredChildren = append(registeredChildren, c)
result.MemberEmails = append(result.MemberEmails, c.Email)
result.Registered++
}
}
if len(failedSlots) > 0 {
result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots)))
}
logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team")
// Step 4: S2A 授权入库(成员)
for i, child := range registeredChildren {
if !teamProcessState.Running {
break
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d auth URL: %v", i+1, err))
continue
}
// 根据配置选择浏览器自动化
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d browser: %v", i+1, err))
continue
}
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
child.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d S2A: %v", i+1, err))
continue
}
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team")
}
// Step 5: 母号也入库(如果开启)
if req.IncludeOwner && teamProcessState.Running {
logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team")
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner auth URL: %v", err))
} else {
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner browser: %v", err))
} else {
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
owner.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err))
} else {
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team")
}
}
}
}
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team")
// 根据入库结果标记 owner 状态
// 只要有任何一个账号入库成功,就标记为已使用
markOwnerResult(result.AddedToS2A > 0)
return result
}
// registerWithTimeout 带超时的注册
func registerWithTimeout(email, password, name, birthdate, proxy string) (*register.ChatGPTReg, error) {
reg, err := register.New(proxy)
if err != nil {
return nil, err
}
if err := reg.InitSession(); err != nil {
return nil, fmt.Errorf("初始化失败: %v", err)
}
if err := reg.GetAuthorizeURL(email); err != nil {
return nil, fmt.Errorf("获取授权URL失败: %v", err)
}
if err := reg.StartAuthorize(); err != nil {
return nil, fmt.Errorf("启动授权失败: %v", err)
}
if err := reg.Register(email, password); err != nil {
return nil, fmt.Errorf("注册失败: %v", err)
}
if err := reg.SendVerificationEmail(); err != nil {
return nil, fmt.Errorf("发送邮件失败: %v", err)
}
// 短超时获取验证码
otpCode, err := mail.GetVerificationCode(email, 5*time.Second)
if err != nil {
otpCode, err = mail.GetVerificationCode(email, 15*time.Second)
if err != nil {
return nil, fmt.Errorf("验证码获取超时")
}
}
if err := reg.ValidateOTP(otpCode); err != nil {
return nil, fmt.Errorf("OTP验证失败: %v", err)
}
if err := reg.CreateAccount(name, birthdate); err != nil {
return nil, fmt.Errorf("创建账户失败: %v", err)
}
_ = reg.GetSessionToken()
return reg, nil
}