990 lines
32 KiB
Go
990 lines
32 KiB
Go
package api
|
||
|
||
import (
|
||
"encoding/json"
|
||
"fmt"
|
||
"net/http"
|
||
"strings"
|
||
"sync"
|
||
"sync/atomic"
|
||
"time"
|
||
|
||
"codex-pool/internal/auth"
|
||
"codex-pool/internal/config"
|
||
"codex-pool/internal/database"
|
||
"codex-pool/internal/invite"
|
||
"codex-pool/internal/logger"
|
||
"codex-pool/internal/mail"
|
||
"codex-pool/internal/proxyutil"
|
||
"codex-pool/internal/register"
|
||
)
|
||
|
||
// TeamOwner 团队母号信息
|
||
type TeamOwner struct {
|
||
Email string `json:"email"`
|
||
Password string `json:"password"`
|
||
Token string `json:"token"`
|
||
AccountID string `json:"account_id"` // 已存储的 account_id,如有则直接使用
|
||
}
|
||
|
||
// TeamProcessRequest 团队处理请求
|
||
type TeamProcessRequest struct {
|
||
// Owner 账号列表
|
||
Owners []TeamOwner `json:"owners"`
|
||
// 配置
|
||
MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数
|
||
ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量
|
||
ConcurrentS2A int `json:"concurrent_s2a"` // 入库并发数(默认2)
|
||
BrowserType string `json:"browser_type"` // "chromedp" 或 "rod"
|
||
Headless bool `json:"headless"` // 是否无头模式
|
||
Proxy string `json:"proxy"` // 代理设置
|
||
IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A
|
||
ProcessCount int `json:"process_count"` // 处理数量,0表示全部
|
||
}
|
||
|
||
// TeamProcessResult 团队处理结果
|
||
type TeamProcessResult struct {
|
||
TeamIndex int `json:"team_index"`
|
||
OwnerEmail string `json:"owner_email"`
|
||
TeamID string `json:"team_id"`
|
||
Registered int `json:"registered"`
|
||
AddedToS2A int `json:"added_to_s2a"`
|
||
MemberEmails []string `json:"member_emails"`
|
||
Errors []string `json:"errors"`
|
||
DurationMs int64 `json:"duration_ms"`
|
||
}
|
||
|
||
// TeamProcessState 处理状态
|
||
type TeamProcessState struct {
|
||
Running bool `json:"running"`
|
||
StartedAt time.Time `json:"started_at"`
|
||
TotalTeams int `json:"total_teams"`
|
||
Completed int32 `json:"completed"`
|
||
Results []TeamProcessResult `json:"results"`
|
||
mu sync.Mutex
|
||
}
|
||
|
||
var teamProcessState = &TeamProcessState{}
|
||
|
||
// HandleTeamProcess POST /api/team/process - 启动 Team 批量处理
|
||
func HandleTeamProcess(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
|
||
return
|
||
}
|
||
|
||
// 检查是否正在运行
|
||
if teamProcessState.Running {
|
||
Error(w, http.StatusConflict, "已有任务正在运行")
|
||
return
|
||
}
|
||
|
||
var req TeamProcessRequest
|
||
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
|
||
Error(w, http.StatusBadRequest, "请求格式错误")
|
||
return
|
||
}
|
||
|
||
// 如果没有传入 owners,从数据库获取待处理的母号
|
||
if len(req.Owners) == 0 {
|
||
pendingOwners, err := database.Instance.GetPendingOwners()
|
||
if err != nil {
|
||
Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err))
|
||
return
|
||
}
|
||
if len(pendingOwners) == 0 {
|
||
Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件")
|
||
return
|
||
}
|
||
// 转换为请求格式(包含已存储的 account_id)
|
||
for _, o := range pendingOwners {
|
||
req.Owners = append(req.Owners, TeamOwner{
|
||
Email: o.Email,
|
||
Password: o.Password,
|
||
Token: o.Token,
|
||
AccountID: o.AccountID, // 直接使用数据库中存储的 account_id
|
||
})
|
||
}
|
||
logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team")
|
||
}
|
||
|
||
// 根据 ProcessCount 限制处理数量
|
||
if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) {
|
||
req.Owners = req.Owners[:req.ProcessCount]
|
||
logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team")
|
||
}
|
||
|
||
if req.MembersPerTeam <= 0 {
|
||
req.MembersPerTeam = 4
|
||
}
|
||
if req.ConcurrentTeams <= 0 {
|
||
req.ConcurrentTeams = len(req.Owners)
|
||
}
|
||
if req.ConcurrentTeams > len(req.Owners) {
|
||
req.ConcurrentTeams = len(req.Owners)
|
||
}
|
||
if req.BrowserType == "" {
|
||
req.BrowserType = "chromedp" // 默认使用 Chromedp
|
||
}
|
||
if req.ConcurrentS2A <= 0 {
|
||
req.ConcurrentS2A = 2 // 默认入库并发数为 2
|
||
}
|
||
if req.ConcurrentS2A > 4 {
|
||
req.ConcurrentS2A = 4 // 最大入库并发数为 4(避免浏览器资源耗尽)
|
||
}
|
||
if req.Proxy == "" && config.Global != nil {
|
||
req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法
|
||
}
|
||
if req.Proxy != "" {
|
||
normalized, err := proxyutil.Normalize(req.Proxy)
|
||
if err != nil {
|
||
Error(w, http.StatusBadRequest, fmt.Sprintf("代理格式错误: %v", err))
|
||
return
|
||
}
|
||
req.Proxy = normalized
|
||
}
|
||
|
||
// 初始化状态
|
||
teamProcessState.Running = true
|
||
teamProcessState.StartedAt = time.Now()
|
||
teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理
|
||
teamProcessState.Completed = 0
|
||
teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners))
|
||
|
||
// 异步执行
|
||
go runTeamProcess(req)
|
||
|
||
Success(w, map[string]interface{}{
|
||
"message": "任务已启动",
|
||
"total_teams": len(req.Owners),
|
||
"concurrent_teams": req.ConcurrentTeams,
|
||
"started_at": teamProcessState.StartedAt,
|
||
})
|
||
}
|
||
|
||
// HandleTeamProcessStatus GET /api/team/status - 获取处理状态
|
||
func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodGet {
|
||
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
|
||
return
|
||
}
|
||
|
||
teamProcessState.mu.Lock()
|
||
defer teamProcessState.mu.Unlock()
|
||
|
||
Success(w, map[string]interface{}{
|
||
"running": teamProcessState.Running,
|
||
"started_at": teamProcessState.StartedAt,
|
||
"total_teams": teamProcessState.TotalTeams,
|
||
"completed": teamProcessState.Completed,
|
||
"results": teamProcessState.Results,
|
||
"elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(),
|
||
})
|
||
}
|
||
|
||
// HandleTeamProcessStop POST /api/team/stop - 停止处理
|
||
func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodPost {
|
||
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
|
||
return
|
||
}
|
||
|
||
teamProcessState.Running = false
|
||
Success(w, map[string]string{"message": "已发送停止信号"})
|
||
}
|
||
|
||
// HandleBatchHistory GET /api/batch/history - 获取历史批次(分页)
|
||
func HandleBatchHistory(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodGet {
|
||
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
|
||
return
|
||
}
|
||
|
||
if database.Instance == nil {
|
||
Error(w, http.StatusInternalServerError, "数据库未初始化")
|
||
return
|
||
}
|
||
|
||
// 获取分页参数
|
||
page := 1
|
||
pageSize := 5
|
||
if p := r.URL.Query().Get("page"); p != "" {
|
||
if v, err := fmt.Sscanf(p, "%d", &page); err == nil && v > 0 {
|
||
// page已设置
|
||
}
|
||
}
|
||
if ps := r.URL.Query().Get("page_size"); ps != "" {
|
||
if v, err := fmt.Sscanf(ps, "%d", &pageSize); err == nil && v > 0 {
|
||
// pageSize已设置
|
||
}
|
||
}
|
||
|
||
runs, total, err := database.Instance.GetBatchRunsWithPagination(page, pageSize)
|
||
if err != nil {
|
||
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
|
||
return
|
||
}
|
||
|
||
totalPages := (total + pageSize - 1) / pageSize
|
||
|
||
Success(w, map[string]interface{}{
|
||
"runs": runs,
|
||
"total": total,
|
||
"page": page,
|
||
"page_size": pageSize,
|
||
"total_pages": totalPages,
|
||
})
|
||
}
|
||
|
||
// HandleBatchDetail GET /api/batch/detail?id=xxx - 获取批次详情(包含Team结果)
|
||
func HandleBatchDetail(w http.ResponseWriter, r *http.Request) {
|
||
if r.Method != http.MethodGet {
|
||
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
|
||
return
|
||
}
|
||
|
||
if database.Instance == nil {
|
||
Error(w, http.StatusInternalServerError, "数据库未初始化")
|
||
return
|
||
}
|
||
|
||
idStr := r.URL.Query().Get("id")
|
||
if idStr == "" {
|
||
Error(w, http.StatusBadRequest, "缺少批次ID")
|
||
return
|
||
}
|
||
|
||
var batchID int64
|
||
if _, err := fmt.Sscanf(idStr, "%d", &batchID); err != nil {
|
||
Error(w, http.StatusBadRequest, "无效的批次ID")
|
||
return
|
||
}
|
||
|
||
run, results, err := database.Instance.GetBatchRunWithResults(batchID)
|
||
if err != nil {
|
||
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
|
||
return
|
||
}
|
||
|
||
Success(w, map[string]interface{}{
|
||
"batch": run,
|
||
"results": results,
|
||
})
|
||
}
|
||
|
||
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
|
||
func runTeamProcess(req TeamProcessRequest) {
|
||
totalOwners := len(req.Owners)
|
||
workerCount := req.ConcurrentTeams // 同时运行的 worker 数量
|
||
if workerCount > totalOwners {
|
||
workerCount = totalOwners
|
||
}
|
||
if workerCount <= 0 {
|
||
workerCount = 2 // 默认 2 个并发
|
||
}
|
||
|
||
// 创建批次记录
|
||
var batchID int64
|
||
if database.Instance != nil {
|
||
var err error
|
||
batchID, err = database.Instance.CreateBatchRun(totalOwners)
|
||
if err != nil {
|
||
logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team")
|
||
}
|
||
}
|
||
|
||
// 统计变量(在 defer 中使用)
|
||
var totalRegistered, totalAddedToS2A int
|
||
var allErrors []string
|
||
|
||
// 确保任务结束时更新状态和批次记录
|
||
defer func() {
|
||
teamProcessState.Running = false
|
||
|
||
// 无论任务是正常完成还是异常中断,都更新批次记录状态
|
||
if database.Instance != nil && batchID > 0 {
|
||
errorsStr := ""
|
||
if len(allErrors) > 0 {
|
||
// 只保留前10条错误
|
||
if len(allErrors) > 10 {
|
||
allErrors = allErrors[:10]
|
||
}
|
||
errorsStr = fmt.Sprintf("%v", allErrors)
|
||
}
|
||
if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil {
|
||
logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team")
|
||
}
|
||
}
|
||
}()
|
||
|
||
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
|
||
|
||
// 任务队列
|
||
taskChan := make(chan int, totalOwners)
|
||
resultChan := make(chan TeamProcessResult, totalOwners)
|
||
|
||
var wg sync.WaitGroup
|
||
|
||
// 启动 worker
|
||
for w := 0; w < workerCount; w++ {
|
||
wg.Add(1)
|
||
go func(workerID int) {
|
||
defer wg.Done()
|
||
for idx := range taskChan {
|
||
if !teamProcessState.Running {
|
||
return
|
||
}
|
||
result := processSingleTeam(idx, req)
|
||
resultChan <- result
|
||
atomic.AddInt32(&teamProcessState.Completed, 1)
|
||
}
|
||
}(w)
|
||
}
|
||
|
||
// 发送任务
|
||
go func() {
|
||
for i := 0; i < totalOwners; i++ {
|
||
if !teamProcessState.Running {
|
||
break
|
||
}
|
||
taskChan <- i
|
||
}
|
||
close(taskChan)
|
||
}()
|
||
|
||
// 等待完成并收集结果
|
||
go func() {
|
||
wg.Wait()
|
||
close(resultChan)
|
||
}()
|
||
|
||
// 收集结果并统计
|
||
for result := range resultChan {
|
||
teamProcessState.mu.Lock()
|
||
teamProcessState.Results = append(teamProcessState.Results, result)
|
||
teamProcessState.mu.Unlock()
|
||
|
||
totalRegistered += result.Registered
|
||
totalAddedToS2A += result.AddedToS2A
|
||
allErrors = append(allErrors, result.Errors...)
|
||
|
||
// 保存单个Team结果到数据库
|
||
if database.Instance != nil && batchID > 0 {
|
||
dbResult := database.BatchTeamResult{
|
||
TeamIndex: result.TeamIndex,
|
||
OwnerEmail: result.OwnerEmail,
|
||
TeamID: result.TeamID,
|
||
Registered: result.Registered,
|
||
AddedToS2A: result.AddedToS2A,
|
||
MemberEmails: result.MemberEmails,
|
||
Errors: result.Errors,
|
||
DurationMs: result.DurationMs,
|
||
}
|
||
if err := database.Instance.SaveBatchTeamResult(batchID, dbResult); err != nil {
|
||
logger.Error(fmt.Sprintf("保存Team结果失败: %v", err), result.OwnerEmail, "team")
|
||
}
|
||
}
|
||
}
|
||
|
||
// 计算成功率
|
||
expectedTotal := totalOwners * req.MembersPerTeam
|
||
successRate := float64(0)
|
||
if expectedTotal > 0 {
|
||
successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100
|
||
}
|
||
|
||
logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%",
|
||
teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team")
|
||
}
|
||
|
||
// processSingleTeam 处理单个 Team
|
||
func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) {
|
||
startTime := time.Now()
|
||
owner := req.Owners[idx]
|
||
result = TeamProcessResult{
|
||
TeamIndex: idx + 1,
|
||
OwnerEmail: owner.Email,
|
||
MemberEmails: make([]string, 0),
|
||
Errors: make([]string, 0),
|
||
}
|
||
|
||
// 固定宽度的 Team 编号 (支持到 Team 99)
|
||
logPrefix := fmt.Sprintf("[Team %2d]", idx+1)
|
||
|
||
// panic 恢复,防止单个 Team 处理崩溃影响整个任务
|
||
defer func() {
|
||
if r := recover(); r != nil {
|
||
logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team")
|
||
result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r))
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
// 恢复为 valid 允许重试
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsFailed(owner.Email)
|
||
}
|
||
}
|
||
}()
|
||
|
||
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
|
||
|
||
// 标记 owner 为处理中
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsProcessing(owner.Email)
|
||
}
|
||
|
||
// 处理失败时的清理函数
|
||
markOwnerResult := func(success bool) {
|
||
if database.Instance != nil {
|
||
if success {
|
||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||
logger.Info(fmt.Sprintf("%s 母号已使用并删除: %s", logPrefix, owner.Email), owner.Email, "team")
|
||
} else {
|
||
// 失败时恢复为 valid,允许重试
|
||
database.Instance.MarkOwnerAsFailed(owner.Email)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Step 1: 获取 Team ID(优先使用已存储的 account_id)
|
||
var teamID string
|
||
inviter := invite.NewWithProxy(owner.Token, req.Proxy)
|
||
|
||
if owner.AccountID != "" {
|
||
// 直接使用数据库中存储的 account_id
|
||
teamID = owner.AccountID
|
||
inviter.SetAccountID(teamID) // 必须设置到 inviter 中
|
||
logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team")
|
||
} else {
|
||
// 如果没有存储,才请求 API 获取
|
||
var err error
|
||
teamID, err = inviter.GetAccountID()
|
||
if err != nil {
|
||
errStr := err.Error()
|
||
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
|
||
|
||
// Token 过期或无效,标记为 invalid 不再重试
|
||
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") ||
|
||
strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") {
|
||
logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team")
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||
logger.Info(fmt.Sprintf("%s 母号无效已删除: %s", logPrefix, owner.Email), owner.Email, "team")
|
||
}
|
||
return result
|
||
}
|
||
|
||
markOwnerResult(false)
|
||
return result
|
||
}
|
||
logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team")
|
||
}
|
||
result.TeamID = teamID
|
||
|
||
// Step 2: 测试邀请功能(检测 Team 是否被封禁或已满)
|
||
testEmail := mail.GenerateEmail()
|
||
if err := inviter.SendInvites([]string{testEmail}); err != nil {
|
||
errStr := err.Error()
|
||
|
||
// 检测 Team 已达邀请上限(401 或 maximum number of seats)
|
||
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
|
||
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team")
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||
logger.Info(fmt.Sprintf("%s 母号已使用并删除(邀请已满): %s", logPrefix, owner.Email), owner.Email, "team")
|
||
}
|
||
result.Errors = append(result.Errors, "Team 邀请已满")
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
return result
|
||
}
|
||
|
||
// 检测 Team 被封禁
|
||
if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") ||
|
||
strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") ||
|
||
strings.Contains(errStr, "deactivated") {
|
||
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||
logger.Info(fmt.Sprintf("%s 母号无效已删除(Team被封禁): %s", logPrefix, owner.Email), owner.Email, "team")
|
||
}
|
||
result.Errors = append(result.Errors, "Team 被封禁")
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
return result
|
||
}
|
||
|
||
// 其他邀请错误,继续尝试
|
||
logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team")
|
||
}
|
||
|
||
// Step 3: 并发注册成员
|
||
// 每个成员:邀请 → 注册,失败重试1次
|
||
// Team 有4次额外补救机会
|
||
type MemberAccount struct {
|
||
Email string
|
||
Password string
|
||
Success bool
|
||
}
|
||
children := make([]MemberAccount, req.MembersPerTeam)
|
||
var memberMu sync.Mutex
|
||
var memberWg sync.WaitGroup
|
||
|
||
// 共享标志:Team 邀请已满,所有 goroutine 应停止
|
||
var teamExhausted int32
|
||
|
||
// 检查 Team 是否已满的辅助函数
|
||
isTeamExhausted := func() bool {
|
||
return atomic.LoadInt32(&teamExhausted) == 1
|
||
}
|
||
|
||
// 标记 Team 已满
|
||
markTeamExhausted := func() {
|
||
if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) {
|
||
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team")
|
||
if database.Instance != nil {
|
||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||
logger.Info(fmt.Sprintf("%s 母号已使用并删除(Team耗尽): %s", logPrefix, owner.Email), owner.Email, "team")
|
||
}
|
||
}
|
||
}
|
||
|
||
// 注册单个成员的函数(带1次重试)
|
||
registerMember := func(memberIdx int, email, password string) bool {
|
||
name := register.GenerateName()
|
||
birthdate := register.GenerateBirthdate()
|
||
memberLogPrefix := fmt.Sprintf("%s [成员 %d]", logPrefix, memberIdx+1)
|
||
regStartTime := time.Now()
|
||
|
||
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试)
|
||
// 检查是否应该停止
|
||
if !teamProcessState.Running || isTeamExhausted() {
|
||
return false
|
||
}
|
||
|
||
currentEmail := email
|
||
currentPassword := password
|
||
if attempt > 0 {
|
||
// 重试时使用新邮箱
|
||
currentEmail = mail.GenerateEmail()
|
||
currentPassword = register.GeneratePassword()
|
||
logger.Warning(fmt.Sprintf("%s 重试 (第%d次), 新邮箱: %s", memberLogPrefix, attempt+1, currentEmail), currentEmail, "team")
|
||
}
|
||
|
||
// 发送邀请
|
||
if err := inviter.SendInvites([]string{currentEmail}); err != nil {
|
||
errStr := err.Error()
|
||
logger.Error(fmt.Sprintf("%s 邀请失败: %v", memberLogPrefix, err), currentEmail, "team")
|
||
|
||
// 检测 Team 已达邀请上限(401 或 maximum number of seats)
|
||
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
|
||
markTeamExhausted()
|
||
return false
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记)
|
||
if isTeamExhausted() {
|
||
return false
|
||
}
|
||
|
||
// 注册
|
||
_, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy)
|
||
if err != nil {
|
||
logger.Error(fmt.Sprintf("%s [注册失败] %v", memberLogPrefix, err), currentEmail, "team")
|
||
continue
|
||
}
|
||
|
||
// 成功
|
||
regDuration := time.Since(regStartTime)
|
||
memberMu.Lock()
|
||
children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true}
|
||
memberMu.Unlock()
|
||
logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team")
|
||
return true
|
||
}
|
||
return false
|
||
}
|
||
|
||
// 第一轮:并发注册4个成员
|
||
logger.Info(fmt.Sprintf("%s ════════ 开始注册阶段 ════════ 目标: %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team")
|
||
regPhaseStartTime := time.Now()
|
||
for i := 0; i < req.MembersPerTeam; i++ {
|
||
memberWg.Add(1)
|
||
go func(idx int) {
|
||
defer memberWg.Done()
|
||
// 检查是否应该停止
|
||
if isTeamExhausted() {
|
||
return
|
||
}
|
||
email := mail.GenerateEmail()
|
||
password := register.GeneratePassword()
|
||
logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s | 密码: %s", logPrefix, idx+1, email, password), email, "team")
|
||
registerMember(idx, email, password)
|
||
}(i)
|
||
}
|
||
memberWg.Wait()
|
||
|
||
// 如果 Team 已满,直接跳过补救和后续处理
|
||
if isTeamExhausted() {
|
||
result.Errors = append(result.Errors, "Team 邀请已满")
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
return result
|
||
}
|
||
|
||
// 统计失败的成员
|
||
failedSlots := make([]int, 0)
|
||
for i, c := range children {
|
||
if !c.Success {
|
||
failedSlots = append(failedSlots, i)
|
||
}
|
||
}
|
||
|
||
// 第二轮:Team 有 4 次额外补救机会(如果 Team 未满)
|
||
teamRetries := 4
|
||
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ {
|
||
slotIdx := failedSlots[0]
|
||
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
|
||
|
||
email := mail.GenerateEmail()
|
||
password := register.GeneratePassword()
|
||
if registerMember(slotIdx, email, password) {
|
||
failedSlots = failedSlots[1:] // 成功,移除这个槽位
|
||
}
|
||
}
|
||
|
||
// 补救后再次检查 Team 是否已满
|
||
if isTeamExhausted() {
|
||
result.Errors = append(result.Errors, "Team 邀请已满")
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
return result
|
||
}
|
||
|
||
// 统计注册成功数
|
||
registeredChildren := make([]MemberAccount, 0)
|
||
for _, c := range children {
|
||
if c.Success {
|
||
registeredChildren = append(registeredChildren, c)
|
||
result.MemberEmails = append(result.MemberEmails, c.Email)
|
||
result.Registered++
|
||
}
|
||
}
|
||
|
||
if len(failedSlots) > 0 {
|
||
result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots)))
|
||
}
|
||
regPhaseDuration := time.Since(regPhaseStartTime)
|
||
logger.Info(fmt.Sprintf("%s ════════ 注册阶段完成 ════════ 成功: %d/%d, 耗时: %.1fs", logPrefix, result.Registered, req.MembersPerTeam, regPhaseDuration.Seconds()), owner.Email, "team")
|
||
|
||
// 如果没有任何成员注册成功,跳过入库步骤
|
||
if len(registeredChildren) == 0 {
|
||
logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过入库步骤", logPrefix), owner.Email, "team")
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
markOwnerResult(false)
|
||
return result
|
||
}
|
||
|
||
// Step 4: S2A 授权入库(成员)- 并发入库
|
||
logger.Info(fmt.Sprintf("%s ════════ 开始入库阶段 ════════ 共 %d 个成员, 并发数: %d", logPrefix, len(registeredChildren), req.ConcurrentS2A), owner.Email, "team")
|
||
s2aStartTime := time.Now()
|
||
|
||
// 入库结果
|
||
type S2AResult struct {
|
||
Index int
|
||
Email string
|
||
Success bool
|
||
Error string
|
||
}
|
||
|
||
s2aResults := make(chan S2AResult, len(registeredChildren))
|
||
s2aSem := make(chan struct{}, req.ConcurrentS2A) // 并发控制信号量
|
||
|
||
var s2aWg sync.WaitGroup
|
||
|
||
for i, child := range registeredChildren {
|
||
if !teamProcessState.Running {
|
||
break
|
||
}
|
||
|
||
s2aWg.Add(1)
|
||
go func(memberIdx int, memberChild MemberAccount) {
|
||
defer s2aWg.Done()
|
||
|
||
// 获取信号量
|
||
s2aSem <- struct{}{}
|
||
defer func() { <-s2aSem }()
|
||
|
||
memberStartTime := time.Now()
|
||
memberLogPrefix := fmt.Sprintf("%s [成员 %d]", logPrefix, memberIdx+1)
|
||
|
||
logger.Info(fmt.Sprintf("%s 开始入库 | 邮箱: %s", memberLogPrefix, memberChild.Email), memberChild.Email, "team")
|
||
|
||
var s2aSuccess bool
|
||
var lastError string
|
||
|
||
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
|
||
if attempt > 0 {
|
||
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberChild.Email, "team")
|
||
}
|
||
|
||
// 创建日志回调(输出关键日志和调试信息)
|
||
authLogger := auth.NewAuthLogger(memberChild.Email, logPrefix, memberIdx+1, func(entry auth.AuthLogEntry) {
|
||
if entry.IsError {
|
||
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, entry.Message), memberChild.Email, "team")
|
||
} else {
|
||
// 输出关键步骤:导航、输入、完成等
|
||
switch entry.Step {
|
||
case auth.StepNavigate, auth.StepInputEmail, auth.StepInputPassword,
|
||
auth.StepComplete, auth.StepConsent, auth.StepSelectWorkspace:
|
||
logger.Info(fmt.Sprintf("%s %s", memberLogPrefix, entry.Message), memberChild.Email, "team")
|
||
}
|
||
}
|
||
})
|
||
|
||
// 获取授权 URL
|
||
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("获取授权URL失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberChild.Email, "team")
|
||
continue
|
||
}
|
||
|
||
// 根据配置选择浏览器自动化
|
||
var code string
|
||
// 根据全局配置决定授权方式
|
||
if config.Global.AuthMethod == "api" {
|
||
// 使用纯 API 模式(CodexAuth)- 使用 S2A 生成的授权 URL
|
||
code, err = auth.CompleteWithCodexAPI(memberChild.Email, memberChild.Password, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, req.Proxy, authLogger)
|
||
} else if req.BrowserType == "rod" {
|
||
code, err = auth.CompleteWithRodLogged(s2aResp.Data.AuthURL, memberChild.Email, memberChild.Password, teamID, req.Headless, req.Proxy, authLogger)
|
||
} else {
|
||
code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, memberChild.Email, memberChild.Password, teamID, req.Headless, req.Proxy, authLogger)
|
||
}
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberChild.Email, "team")
|
||
continue
|
||
}
|
||
|
||
// 提交到 S2A
|
||
_, err = auth.SubmitS2AOAuth(
|
||
config.Global.S2AApiBase,
|
||
config.Global.S2AAdminKey,
|
||
s2aResp.Data.SessionID,
|
||
code,
|
||
memberChild.Email,
|
||
config.Global.Concurrency,
|
||
config.Global.Priority,
|
||
config.Global.GroupIDs,
|
||
config.Global.ProxyID,
|
||
)
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("S2A提交失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberChild.Email, "team")
|
||
continue
|
||
}
|
||
|
||
s2aSuccess = true
|
||
memberDuration := time.Since(memberStartTime)
|
||
logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", memberLogPrefix, memberDuration.Seconds()), memberChild.Email, "team")
|
||
break
|
||
}
|
||
|
||
s2aResults <- S2AResult{
|
||
Index: memberIdx,
|
||
Email: memberChild.Email,
|
||
Success: s2aSuccess,
|
||
Error: lastError,
|
||
}
|
||
}(i, child)
|
||
}
|
||
|
||
// 等待所有入库完成
|
||
go func() {
|
||
s2aWg.Wait()
|
||
close(s2aResults)
|
||
}()
|
||
|
||
// 收集入库结果
|
||
for s2aRes := range s2aResults {
|
||
if s2aRes.Success {
|
||
result.AddedToS2A++
|
||
} else {
|
||
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败: %s", s2aRes.Index+1, s2aRes.Error))
|
||
}
|
||
}
|
||
|
||
s2aDuration := time.Since(s2aStartTime)
|
||
logger.Info(fmt.Sprintf("%s ════════ 入库阶段完成 ════════ 成功: %d/%d, 耗时: %.1fs", logPrefix, result.AddedToS2A, len(registeredChildren), s2aDuration.Seconds()), owner.Email, "team")
|
||
|
||
// Step 5: 母号也入库(如果开启)- 带重试
|
||
if req.IncludeOwner && teamProcessState.Running {
|
||
ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix)
|
||
ownerStartTime := time.Now()
|
||
logger.Info(fmt.Sprintf("%s 开始母号入库...", ownerLogPrefix), owner.Email, "team")
|
||
|
||
var ownerSuccess bool
|
||
var lastError string
|
||
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
|
||
if attempt > 0 {
|
||
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team")
|
||
}
|
||
|
||
// 创建日志回调(输出关键日志和调试信息)
|
||
authLogger := auth.NewAuthLogger(owner.Email, logPrefix, 0, func(entry auth.AuthLogEntry) {
|
||
if entry.IsError {
|
||
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team")
|
||
} else {
|
||
switch entry.Step {
|
||
case auth.StepNavigate, auth.StepInputEmail, auth.StepInputPassword,
|
||
auth.StepComplete, auth.StepConsent, auth.StepSelectWorkspace:
|
||
logger.Info(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team")
|
||
}
|
||
}
|
||
})
|
||
|
||
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("获取授权URL失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
|
||
continue
|
||
}
|
||
|
||
var code string
|
||
// 根据全局配置决定授权方式
|
||
if config.Global.AuthMethod == "api" {
|
||
// 使用纯 API 模式(CodexAuth)- 使用 S2A 生成的授权 URL
|
||
code, err = auth.CompleteWithCodexAPI(owner.Email, owner.Password, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, req.Proxy, authLogger)
|
||
} else if req.BrowserType == "rod" {
|
||
code, err = auth.CompleteWithRodLogged(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy, authLogger)
|
||
} else {
|
||
code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy, authLogger)
|
||
}
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
|
||
continue
|
||
}
|
||
|
||
// 提交到 S2A
|
||
_, err = auth.SubmitS2AOAuth(
|
||
config.Global.S2AApiBase,
|
||
config.Global.S2AAdminKey,
|
||
s2aResp.Data.SessionID,
|
||
code,
|
||
owner.Email,
|
||
config.Global.Concurrency,
|
||
config.Global.Priority,
|
||
config.Global.GroupIDs,
|
||
config.Global.ProxyID,
|
||
)
|
||
if err != nil {
|
||
lastError = fmt.Sprintf("S2A提交失败: %v", err)
|
||
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
|
||
continue
|
||
}
|
||
|
||
ownerSuccess = true
|
||
result.AddedToS2A++
|
||
ownerDuration := time.Since(ownerStartTime)
|
||
logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", ownerLogPrefix, ownerDuration.Seconds()), owner.Email, "team")
|
||
break
|
||
}
|
||
|
||
if !ownerSuccess {
|
||
result.Errors = append(result.Errors, fmt.Sprintf("母号入库失败: %s", lastError))
|
||
}
|
||
}
|
||
|
||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||
logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team")
|
||
|
||
// 根据入库结果标记 owner 状态
|
||
// 只要有任何一个账号入库成功,就标记为已使用
|
||
markOwnerResult(result.AddedToS2A > 0)
|
||
|
||
return result
|
||
}
|
||
|
||
// registerWithTimeout 带超时的注册(遇到 403 会换指纹重试)
|
||
func registerWithTimeout(email, password, name, birthdate, proxy string) (*register.ChatGPTReg, error) {
|
||
const maxInitRetries = 3
|
||
var reg *register.ChatGPTReg
|
||
var initErr error
|
||
|
||
// 初始化阶段:遇到 403 换指纹重试
|
||
for attempt := 0; attempt < maxInitRetries; attempt++ {
|
||
var err error
|
||
reg, err = register.New(proxy)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
if err := reg.InitSession(); err != nil {
|
||
initErr = err
|
||
// 检查是否是 403 错误,换指纹重试
|
||
if strings.Contains(err.Error(), "403") {
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("初始化失败: %v", err)
|
||
}
|
||
|
||
if err := reg.GetAuthorizeURL(email); err != nil {
|
||
// 403 也可能在这里出现
|
||
if strings.Contains(err.Error(), "403") {
|
||
initErr = err
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("获取授权URL失败: %v", err)
|
||
}
|
||
|
||
if err := reg.StartAuthorize(); err != nil {
|
||
if strings.Contains(err.Error(), "403") {
|
||
initErr = err
|
||
continue
|
||
}
|
||
return nil, fmt.Errorf("启动授权失败: %v", err)
|
||
}
|
||
|
||
// 初始化成功,跳出重试循环
|
||
initErr = nil
|
||
break
|
||
}
|
||
|
||
if initErr != nil {
|
||
return nil, fmt.Errorf("初始化失败(重试%d次): %v", maxInitRetries, initErr)
|
||
}
|
||
|
||
if err := reg.Register(email, password); err != nil {
|
||
return nil, fmt.Errorf("注册失败: %v", err)
|
||
}
|
||
|
||
if err := reg.SendVerificationEmail(); err != nil {
|
||
return nil, fmt.Errorf("发送邮件失败: %v", err)
|
||
}
|
||
|
||
// 短超时获取验证码
|
||
otpCode, err := mail.GetVerificationCode(email, 5*time.Second)
|
||
if err != nil {
|
||
otpCode, err = mail.GetVerificationCode(email, 15*time.Second)
|
||
if err != nil {
|
||
return nil, fmt.Errorf("验证码获取超时")
|
||
}
|
||
}
|
||
|
||
if err := reg.ValidateOTP(otpCode); err != nil {
|
||
return nil, fmt.Errorf("OTP验证失败: %v", err)
|
||
}
|
||
|
||
if err := reg.CreateAccount(name, birthdate); err != nil {
|
||
return nil, fmt.Errorf("创建账户失败: %v", err)
|
||
}
|
||
|
||
_ = reg.GetSessionToken()
|
||
return reg, nil
|
||
}
|