Files
codexautopool/backend/internal/api/team_process.go

834 lines
26 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"codex-pool/internal/auth"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/invite"
"codex-pool/internal/logger"
"codex-pool/internal/mail"
"codex-pool/internal/register"
)
// TeamOwner 团队母号信息
type TeamOwner struct {
Email string `json:"email"`
Password string `json:"password"`
Token string `json:"token"`
AccountID string `json:"account_id"` // 已存储的 account_id如有则直接使用
}
// TeamProcessRequest 团队处理请求
type TeamProcessRequest struct {
// Owner 账号列表
Owners []TeamOwner `json:"owners"`
// 配置
MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数
ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量
BrowserType string `json:"browser_type"` // "chromedp" 或 "rod"
Headless bool `json:"headless"` // 是否无头模式
Proxy string `json:"proxy"` // 代理设置
IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A
ProcessCount int `json:"process_count"` // 处理数量0表示全部
}
// TeamProcessResult 团队处理结果
type TeamProcessResult struct {
TeamIndex int `json:"team_index"`
OwnerEmail string `json:"owner_email"`
TeamID string `json:"team_id"`
Registered int `json:"registered"`
AddedToS2A int `json:"added_to_s2a"`
MemberEmails []string `json:"member_emails"`
Errors []string `json:"errors"`
DurationMs int64 `json:"duration_ms"`
}
// TeamProcessState 处理状态
type TeamProcessState struct {
Running bool `json:"running"`
StartedAt time.Time `json:"started_at"`
TotalTeams int `json:"total_teams"`
Completed int32 `json:"completed"`
Results []TeamProcessResult `json:"results"`
mu sync.Mutex
}
var teamProcessState = &TeamProcessState{}
// HandleTeamProcess POST /api/team/process - 启动 Team 批量处理
func HandleTeamProcess(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
// 检查是否正在运行
if teamProcessState.Running {
Error(w, http.StatusConflict, "已有任务正在运行")
return
}
var req TeamProcessRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
Error(w, http.StatusBadRequest, "请求格式错误")
return
}
// 如果没有传入 owners从数据库获取待处理的母号
if len(req.Owners) == 0 {
pendingOwners, err := database.Instance.GetPendingOwners()
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err))
return
}
if len(pendingOwners) == 0 {
Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件")
return
}
// 转换为请求格式(包含已存储的 account_id
for _, o := range pendingOwners {
req.Owners = append(req.Owners, TeamOwner{
Email: o.Email,
Password: o.Password,
Token: o.Token,
AccountID: o.AccountID, // 直接使用数据库中存储的 account_id
})
}
logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team")
}
// 根据 ProcessCount 限制处理数量
if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) {
req.Owners = req.Owners[:req.ProcessCount]
logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team")
}
if req.MembersPerTeam <= 0 {
req.MembersPerTeam = 4
}
if req.ConcurrentTeams <= 0 {
req.ConcurrentTeams = len(req.Owners)
}
if req.ConcurrentTeams > len(req.Owners) {
req.ConcurrentTeams = len(req.Owners)
}
if req.BrowserType == "" {
req.BrowserType = "chromedp" // 默认使用 Chromedp
}
if req.Proxy == "" && config.Global != nil {
req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法
}
// 初始化状态
teamProcessState.Running = true
teamProcessState.StartedAt = time.Now()
teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理
teamProcessState.Completed = 0
teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners))
// 异步执行
go runTeamProcess(req)
Success(w, map[string]interface{}{
"message": "任务已启动",
"total_teams": len(req.Owners),
"concurrent_teams": req.ConcurrentTeams,
"started_at": teamProcessState.StartedAt,
})
}
// HandleTeamProcessStatus GET /api/team/status - 获取处理状态
func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
teamProcessState.mu.Lock()
defer teamProcessState.mu.Unlock()
Success(w, map[string]interface{}{
"running": teamProcessState.Running,
"started_at": teamProcessState.StartedAt,
"total_teams": teamProcessState.TotalTeams,
"completed": teamProcessState.Completed,
"results": teamProcessState.Results,
"elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(),
})
}
// HandleTeamProcessStop POST /api/team/stop - 停止处理
func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
teamProcessState.Running = false
Success(w, map[string]string{"message": "已发送停止信号"})
}
// HandleBatchHistory GET /api/batch/history - 获取历史批次(分页)
func HandleBatchHistory(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
if database.Instance == nil {
Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
// 获取分页参数
page := 1
pageSize := 5
if p := r.URL.Query().Get("page"); p != "" {
if v, err := fmt.Sscanf(p, "%d", &page); err == nil && v > 0 {
// page已设置
}
}
if ps := r.URL.Query().Get("page_size"); ps != "" {
if v, err := fmt.Sscanf(ps, "%d", &pageSize); err == nil && v > 0 {
// pageSize已设置
}
}
runs, total, err := database.Instance.GetBatchRunsWithPagination(page, pageSize)
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
return
}
totalPages := (total + pageSize - 1) / pageSize
Success(w, map[string]interface{}{
"runs": runs,
"total": total,
"page": page,
"page_size": pageSize,
"total_pages": totalPages,
})
}
// HandleBatchDetail GET /api/batch/detail?id=xxx - 获取批次详情包含Team结果
func HandleBatchDetail(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
if database.Instance == nil {
Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
idStr := r.URL.Query().Get("id")
if idStr == "" {
Error(w, http.StatusBadRequest, "缺少批次ID")
return
}
var batchID int64
if _, err := fmt.Sscanf(idStr, "%d", &batchID); err != nil {
Error(w, http.StatusBadRequest, "无效的批次ID")
return
}
run, results, err := database.Instance.GetBatchRunWithResults(batchID)
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
return
}
Success(w, map[string]interface{}{
"batch": run,
"results": results,
})
}
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
func runTeamProcess(req TeamProcessRequest) {
totalOwners := len(req.Owners)
workerCount := req.ConcurrentTeams // 同时运行的 worker 数量
if workerCount > totalOwners {
workerCount = totalOwners
}
if workerCount <= 0 {
workerCount = 2 // 默认 2 个并发
}
// 创建批次记录
var batchID int64
if database.Instance != nil {
var err error
batchID, err = database.Instance.CreateBatchRun(totalOwners)
if err != nil {
logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team")
}
}
// 统计变量(在 defer 中使用)
var totalRegistered, totalAddedToS2A int
var allErrors []string
// 确保任务结束时更新状态和批次记录
defer func() {
teamProcessState.Running = false
// 无论任务是正常完成还是异常中断,都更新批次记录状态
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil {
logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team")
}
}
}()
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
// 任务队列
taskChan := make(chan int, totalOwners)
resultChan := make(chan TeamProcessResult, totalOwners)
var wg sync.WaitGroup
// 启动 worker
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for idx := range taskChan {
if !teamProcessState.Running {
return
}
result := processSingleTeam(idx, req)
resultChan <- result
atomic.AddInt32(&teamProcessState.Completed, 1)
}
}(w)
}
// 发送任务
go func() {
for i := 0; i < totalOwners; i++ {
if !teamProcessState.Running {
break
}
taskChan <- i
}
close(taskChan)
}()
// 等待完成并收集结果
go func() {
wg.Wait()
close(resultChan)
}()
// 收集结果并统计
for result := range resultChan {
teamProcessState.mu.Lock()
teamProcessState.Results = append(teamProcessState.Results, result)
teamProcessState.mu.Unlock()
totalRegistered += result.Registered
totalAddedToS2A += result.AddedToS2A
allErrors = append(allErrors, result.Errors...)
// 保存单个Team结果到数据库
if database.Instance != nil && batchID > 0 {
dbResult := database.BatchTeamResult{
TeamIndex: result.TeamIndex,
OwnerEmail: result.OwnerEmail,
TeamID: result.TeamID,
Registered: result.Registered,
AddedToS2A: result.AddedToS2A,
MemberEmails: result.MemberEmails,
Errors: result.Errors,
DurationMs: result.DurationMs,
}
if err := database.Instance.SaveBatchTeamResult(batchID, dbResult); err != nil {
logger.Error(fmt.Sprintf("保存Team结果失败: %v", err), result.OwnerEmail, "team")
}
}
}
// 计算成功率
expectedTotal := totalOwners * req.MembersPerTeam
successRate := float64(0)
if expectedTotal > 0 {
successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100
}
logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%",
teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team")
}
// processSingleTeam 处理单个 Team
func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) {
startTime := time.Now()
owner := req.Owners[idx]
result = TeamProcessResult{
TeamIndex: idx + 1,
OwnerEmail: owner.Email,
MemberEmails: make([]string, 0),
Errors: make([]string, 0),
}
// 固定宽度的 Team 编号 (支持到 Team 99)
logPrefix := fmt.Sprintf("[Team %2d]", idx+1)
// panic 恢复,防止单个 Team 处理崩溃影响整个任务
defer func() {
if r := recover(); r != nil {
logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team")
result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r))
result.DurationMs = time.Since(startTime).Milliseconds()
// 恢复为 valid 允许重试
if database.Instance != nil {
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}()
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
// 标记 owner 为处理中
if database.Instance != nil {
database.Instance.MarkOwnerAsProcessing(owner.Email)
}
// 处理失败时的清理函数
markOwnerResult := func(success bool) {
if database.Instance != nil {
if success {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除: %s", logPrefix, owner.Email), owner.Email, "team")
} else {
// 失败时恢复为 valid允许重试
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}
// Step 1: 获取 Team ID优先使用已存储的 account_id
var teamID string
inviter := invite.NewWithProxy(owner.Token, req.Proxy)
if owner.AccountID != "" {
// 直接使用数据库中存储的 account_id
teamID = owner.AccountID
inviter.SetAccountID(teamID) // 必须设置到 inviter 中
logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team")
} else {
// 如果没有存储,才请求 API 获取
var err error
teamID, err = inviter.GetAccountID()
if err != nil {
errStr := err.Error()
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
// Token 过期或无效,标记为 invalid 不再重试
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") ||
strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") {
logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号无效已删除: %s", logPrefix, owner.Email), owner.Email, "team")
}
return result
}
markOwnerResult(false)
return result
}
logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team")
}
result.TeamID = teamID
// Step 2: 测试邀请功能(检测 Team 是否被封禁或已满)
testEmail := mail.GenerateEmail()
if err := inviter.SendInvites([]string{testEmail}); err != nil {
errStr := err.Error()
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除(邀请已满): %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 检测 Team 被封禁
if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") ||
strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") ||
strings.Contains(errStr, "deactivated") {
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号无效已删除(Team被封禁): %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "Team 被封禁")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 其他邀请错误,继续尝试
logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team")
}
// Step 3: 并发注册成员
// 每个成员:邀请 → 注册失败重试1次
// Team 有4次额外补救机会
type MemberAccount struct {
Email string
Password string
Success bool
}
children := make([]MemberAccount, req.MembersPerTeam)
var memberMu sync.Mutex
var memberWg sync.WaitGroup
// 共享标志Team 邀请已满,所有 goroutine 应停止
var teamExhausted int32
// 检查 Team 是否已满的辅助函数
isTeamExhausted := func() bool {
return atomic.LoadInt32(&teamExhausted) == 1
}
// 标记 Team 已满
markTeamExhausted := func() {
if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除(Team耗尽): %s", logPrefix, owner.Email), owner.Email, "team")
}
}
}
// 注册单个成员的函数带1次重试
registerMember := func(memberIdx int, email, password string) bool {
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
// 检查是否应该停止
if !teamProcessState.Running || isTeamExhausted() {
return false
}
currentEmail := email
currentPassword := password
if attempt > 0 {
// 重试时使用新邮箱
currentEmail = mail.GenerateEmail()
currentPassword = register.GeneratePassword()
logger.Warning(fmt.Sprintf("%s [成员 %d] 重试, 新邮箱: %s", logPrefix, memberIdx+1, currentEmail), currentEmail, "team")
}
// 发送邀请
if err := inviter.SendInvites([]string{currentEmail}); err != nil {
errStr := err.Error()
logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
markTeamExhausted()
return false
}
continue
}
// 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记)
if isTeamExhausted() {
return false
}
// 注册
_, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 注册失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
continue
}
// 成功
memberMu.Lock()
children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true}
memberMu.Unlock()
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 注册成功", logPrefix, memberIdx+1), currentEmail, "team")
return true
}
return false
}
// 第一轮并发注册4个成员
logger.Info(fmt.Sprintf("%s 开始并发注册 %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team")
for i := 0; i < req.MembersPerTeam; i++ {
memberWg.Add(1)
go func(idx int) {
defer memberWg.Done()
// 检查是否应该停止
if isTeamExhausted() {
return
}
email := mail.GenerateEmail()
password := register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team")
registerMember(idx, email, password)
}(i)
}
memberWg.Wait()
// 如果 Team 已满,直接跳过补救和后续处理
if isTeamExhausted() {
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 统计失败的成员
failedSlots := make([]int, 0)
for i, c := range children {
if !c.Success {
failedSlots = append(failedSlots, i)
}
}
// 第二轮Team 有 4 次额外补救机会(如果 Team 未满)
teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ {
slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
email := mail.GenerateEmail()
password := register.GeneratePassword()
if registerMember(slotIdx, email, password) {
failedSlots = failedSlots[1:] // 成功,移除这个槽位
}
}
// 补救后再次检查 Team 是否已满
if isTeamExhausted() {
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 统计注册成功数
registeredChildren := make([]MemberAccount, 0)
for _, c := range children {
if c.Success {
registeredChildren = append(registeredChildren, c)
result.MemberEmails = append(result.MemberEmails, c.Email)
result.Registered++
}
}
if len(failedSlots) > 0 {
result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots)))
}
logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team")
// 如果没有任何成员注册成功,跳过入库步骤
if len(registeredChildren) == 0 {
logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过入库步骤", logPrefix), owner.Email, "team")
result.DurationMs = time.Since(startTime).Milliseconds()
markOwnerResult(false)
return result
}
// Step 4: S2A 授权入库(成员)- 带重试
for i, child := range registeredChildren {
if !teamProcessState.Running {
break
}
var s2aSuccess bool
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
if attempt > 0 {
logger.Warning(fmt.Sprintf("%s [成员 %d] 入库重试...", logPrefix, i+1), child.Email, "team")
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 获取授权URL失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
// 根据配置选择浏览器自动化
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 浏览器授权失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
child.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] S2A提交失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
s2aSuccess = true
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team")
break
}
if !s2aSuccess {
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败", i+1))
}
}
// Step 5: 母号也入库(如果开启)- 带重试
if req.IncludeOwner && teamProcessState.Running {
logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team")
var ownerSuccess bool
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
if attempt > 0 {
logger.Warning(fmt.Sprintf("%s [母号] 入库重试...", logPrefix), owner.Email, "team")
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
logger.Error(fmt.Sprintf("%s [母号] 获取授权URL失败: %v", logPrefix, err), owner.Email, "team")
continue
}
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
logger.Error(fmt.Sprintf("%s [母号] 浏览器授权失败: %v", logPrefix, err), owner.Email, "team")
continue
}
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
owner.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
logger.Error(fmt.Sprintf("%s [母号] S2A提交失败: %v", logPrefix, err), owner.Email, "team")
continue
}
ownerSuccess = true
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team")
break
}
if !ownerSuccess {
result.Errors = append(result.Errors, "母号入库失败")
}
}
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team")
// 根据入库结果标记 owner 状态
// 只要有任何一个账号入库成功,就标记为已使用
markOwnerResult(result.AddedToS2A > 0)
return result
}
// registerWithTimeout 带超时的注册
func registerWithTimeout(email, password, name, birthdate, proxy string) (*register.ChatGPTReg, error) {
reg, err := register.New(proxy)
if err != nil {
return nil, err
}
if err := reg.InitSession(); err != nil {
return nil, fmt.Errorf("初始化失败: %v", err)
}
if err := reg.GetAuthorizeURL(email); err != nil {
return nil, fmt.Errorf("获取授权URL失败: %v", err)
}
if err := reg.StartAuthorize(); err != nil {
return nil, fmt.Errorf("启动授权失败: %v", err)
}
if err := reg.Register(email, password); err != nil {
return nil, fmt.Errorf("注册失败: %v", err)
}
if err := reg.SendVerificationEmail(); err != nil {
return nil, fmt.Errorf("发送邮件失败: %v", err)
}
// 短超时获取验证码
otpCode, err := mail.GetVerificationCode(email, 5*time.Second)
if err != nil {
otpCode, err = mail.GetVerificationCode(email, 15*time.Second)
if err != nil {
return nil, fmt.Errorf("验证码获取超时")
}
}
if err := reg.ValidateOTP(otpCode); err != nil {
return nil, fmt.Errorf("OTP验证失败: %v", err)
}
if err := reg.CreateAccount(name, birthdate); err != nil {
return nil, fmt.Errorf("创建账户失败: %v", err)
}
_ = reg.GetSessionToken()
return reg, nil
}