Files
codexautopool/backend/internal/api/team_process.go

1202 lines
41 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"encoding/json"
"fmt"
"math/rand"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
"codex-pool/internal/auth"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/demote"
"codex-pool/internal/invite"
"codex-pool/internal/logger"
"codex-pool/internal/mail"
"codex-pool/internal/proxyutil"
"codex-pool/internal/register"
)
// TeamOwner 团队母号信息
type TeamOwner struct {
Email string `json:"email"`
Password string `json:"password"`
Token string `json:"token"`
AccountID string `json:"account_id"` // 已存储的 account_id如有则直接使用
}
// TeamProcessRequest 团队处理请求
type TeamProcessRequest struct {
// Owner 账号列表
Owners []TeamOwner `json:"owners"`
// 配置
MembersPerTeam int `json:"members_per_team"` // 每个 Team 的成员数
ConcurrentTeams int `json:"concurrent_teams"` // 并发 Team 数量
ConcurrentS2A int `json:"concurrent_s2a"` // 入库并发数默认2
BrowserType string `json:"browser_type"` // "chromedp" 或 "rod"
Headless bool `json:"headless"` // 是否无头模式
Proxy string `json:"proxy"` // 代理设置
IncludeOwner bool `json:"include_owner"` // 母号也入库到 S2A
ProcessCount int `json:"process_count"` // 处理数量0表示全部
}
// TeamProcessResult 团队处理结果
type TeamProcessResult struct {
TeamIndex int `json:"team_index"`
OwnerEmail string `json:"owner_email"`
TeamID string `json:"team_id"`
Registered int `json:"registered"`
AddedToS2A int `json:"added_to_s2a"`
MemberEmails []string `json:"member_emails"`
Errors []string `json:"errors"`
DurationMs int64 `json:"duration_ms"`
}
// TeamProcessState 处理状态
type TeamProcessState struct {
Running bool `json:"running"`
StartedAt time.Time `json:"started_at"`
TotalTeams int `json:"total_teams"`
Completed int32 `json:"completed"`
Results []TeamProcessResult `json:"results"`
mu sync.Mutex
stopCh chan struct{} // 停止信号 channelclose 后所有 select 立即返回
}
var teamProcessState = &TeamProcessState{}
// isStopped 检查是否已收到停止信号(非阻塞)
func isStopped() bool {
select {
case <-teamProcessState.stopCh:
return true
default:
return false
}
}
// waitGroupWithTimeout 带超时的 WaitGroup 等待,超时返回 false
func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
done := make(chan struct{})
go func() { wg.Wait(); close(done) }()
select {
case <-done:
return true
case <-teamProcessState.stopCh:
return false
case <-time.After(timeout):
return false
}
}
// getProxyDisplay 获取代理显示名称(隐藏密码)
func getProxyDisplay(proxy string) string {
if proxy == "" {
return "无代理"
}
// 尝试解析 URL只返回 host 部分
if strings.Contains(proxy, "@") {
parts := strings.Split(proxy, "@")
if len(parts) >= 2 {
return parts[len(parts)-1] // 返回 @ 后面的 host:port 部分
}
}
// 去掉协议前缀
proxy = strings.TrimPrefix(proxy, "http://")
proxy = strings.TrimPrefix(proxy, "https://")
proxy = strings.TrimPrefix(proxy, "socks5://")
return proxy
}
// HandleTeamProcess POST /api/team/process - 启动 Team 批量处理
func HandleTeamProcess(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
// 检查是否正在运行
if teamProcessState.Running {
Error(w, http.StatusConflict, "已有任务正在运行")
return
}
var req TeamProcessRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
Error(w, http.StatusBadRequest, "请求格式错误")
return
}
// 如果没有传入 owners从数据库获取待处理的母号
if len(req.Owners) == 0 {
pendingOwners, err := database.Instance.GetPendingOwners()
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("获取待处理账号失败: %v", err))
return
}
if len(pendingOwners) == 0 {
Error(w, http.StatusBadRequest, "没有待处理的母号,请先上传账号文件")
return
}
// 转换为请求格式(包含已存储的 account_id
for _, o := range pendingOwners {
req.Owners = append(req.Owners, TeamOwner{
Email: o.Email,
Password: o.Password,
Token: o.Token,
AccountID: o.AccountID, // 直接使用数据库中存储的 account_id
})
}
logger.Info(fmt.Sprintf("从数据库加载 %d 个待处理母号", len(req.Owners)), "", "team")
}
// 根据 ProcessCount 限制处理数量
if req.ProcessCount > 0 && req.ProcessCount < len(req.Owners) {
req.Owners = req.Owners[:req.ProcessCount]
logger.Info(fmt.Sprintf("限制处理数量: %d 个母号", req.ProcessCount), "", "team")
}
if req.MembersPerTeam <= 0 {
req.MembersPerTeam = 4
}
if req.ConcurrentTeams <= 0 {
req.ConcurrentTeams = len(req.Owners)
}
if req.ConcurrentTeams > len(req.Owners) {
req.ConcurrentTeams = len(req.Owners)
}
if req.BrowserType == "" {
req.BrowserType = "chromedp" // 默认使用 Chromedp
}
if req.ConcurrentS2A <= 0 {
req.ConcurrentS2A = 2 // 默认入库并发数为 2
}
if req.ConcurrentS2A > 4 {
req.ConcurrentS2A = 4 // 最大入库并发数为 4避免浏览器资源耗尽
}
if req.Proxy == "" && config.Global != nil {
req.Proxy = config.Global.GetProxy() // 使用新的代理获取方法
}
if req.Proxy != "" {
// 如果是模式字符串pool:random, pool:id:N跳过 Normalize
if !strings.HasPrefix(req.Proxy, "pool:") && req.Proxy != "[RANDOM]" {
normalized, err := proxyutil.Normalize(req.Proxy)
if err != nil {
Error(w, http.StatusBadRequest, fmt.Sprintf("代理格式错误: %v", err))
return
}
req.Proxy = normalized
}
}
// 启动新任务前,先清理数据库中遗留的 running 状态批次
if database.Instance != nil {
if affected, err := database.Instance.CleanupStuckBatchRuns(); err == nil && affected > 0 {
logger.Warning(fmt.Sprintf("清理了 %d 个遗留的运行中批次记录", affected), "", "team")
}
}
// 初始化状态
teamProcessState.Running = true
teamProcessState.stopCh = make(chan struct{})
teamProcessState.StartedAt = time.Now()
teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理
teamProcessState.Completed = 0
teamProcessState.Results = make([]TeamProcessResult, 0, len(req.Owners))
// 异步执行
go runTeamProcess(req)
Success(w, map[string]interface{}{
"message": "任务已启动",
"total_teams": len(req.Owners),
"concurrent_teams": req.ConcurrentTeams,
"started_at": teamProcessState.StartedAt,
})
}
// HandleTeamProcessStatus GET /api/team/status - 获取处理状态
func HandleTeamProcessStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
teamProcessState.mu.Lock()
defer teamProcessState.mu.Unlock()
Success(w, map[string]interface{}{
"running": teamProcessState.Running,
"started_at": teamProcessState.StartedAt,
"total_teams": teamProcessState.TotalTeams,
"completed": teamProcessState.Completed,
"results": teamProcessState.Results,
"elapsed_ms": time.Since(teamProcessState.StartedAt).Milliseconds(),
})
}
// HandleTeamProcessStop POST /api/team/stop - 停止处理
func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
teamProcessState.Running = false
// 关闭 stopCh 广播停止信号,所有 select 监听的 goroutine 立即唤醒
if teamProcessState.stopCh != nil {
select {
case <-teamProcessState.stopCh:
// 已经关闭过了,不重复关闭
default:
close(teamProcessState.stopCh)
}
}
logger.Warning("收到停止信号,正在终止所有处理...", "", "team")
Success(w, map[string]string{"message": "已发送停止信号"})
}
// HandleBatchHistory GET /api/batch/history - 获取历史批次(分页)
func HandleBatchHistory(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
if database.Instance == nil {
Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
// 获取分页参数
page := 1
pageSize := 5
if p := r.URL.Query().Get("page"); p != "" {
if v, err := fmt.Sscanf(p, "%d", &page); err == nil && v > 0 {
// page已设置
}
}
if ps := r.URL.Query().Get("page_size"); ps != "" {
if v, err := fmt.Sscanf(ps, "%d", &pageSize); err == nil && v > 0 {
// pageSize已设置
}
}
runs, total, err := database.Instance.GetBatchRunsWithPagination(page, pageSize)
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
return
}
totalPages := (total + pageSize - 1) / pageSize
Success(w, map[string]interface{}{
"runs": runs,
"total": total,
"page": page,
"page_size": pageSize,
"total_pages": totalPages,
})
}
// HandleBatchDetail GET /api/batch/detail?id=xxx - 获取批次详情包含Team结果
func HandleBatchDetail(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
return
}
if database.Instance == nil {
Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
idStr := r.URL.Query().Get("id")
if idStr == "" {
Error(w, http.StatusBadRequest, "缺少批次ID")
return
}
var batchID int64
if _, err := fmt.Sscanf(idStr, "%d", &batchID); err != nil {
Error(w, http.StatusBadRequest, "无效的批次ID")
return
}
run, results, err := database.Instance.GetBatchRunWithResults(batchID)
if err != nil {
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
return
}
Success(w, map[string]interface{}{
"batch": run,
"results": results,
})
}
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
func runTeamProcess(req TeamProcessRequest) {
totalOwners := len(req.Owners)
workerCount := req.ConcurrentTeams // 同时运行的 worker 数量
if workerCount > totalOwners {
workerCount = totalOwners
}
if workerCount <= 0 {
workerCount = 2 // 默认 2 个并发
}
// 创建批次记录
var batchID int64
if database.Instance != nil {
var err error
batchID, err = database.Instance.CreateBatchRun(totalOwners)
if err != nil {
logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team")
}
}
// 统计变量(在 defer 中使用)
var totalRegistered, totalAddedToS2A int
var allErrors []string
// 确保任务结束时更新状态和批次记录
defer func() {
teamProcessState.Running = false
// 清理残留的 processing 状态母号,重置为 valid
if database.Instance != nil {
if affected, err := database.Instance.CleanupStuckProcessingOwners(); err == nil && affected > 0 {
logger.Warning(fmt.Sprintf("批次结束,重置了 %d 个残留的处理中母号为有效状态", affected), "", "team")
}
}
// 无论任务是正常完成还是异常中断,都更新批次记录状态
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil {
logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team")
}
}
}()
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
// 任务队列
taskChan := make(chan int, totalOwners)
resultChan := make(chan TeamProcessResult, totalOwners)
// 连续入库为0的暂停机制
var pauseMu sync.Mutex
pauseCond := sync.NewCond(&pauseMu)
paused := false
var wg sync.WaitGroup
// 启动 worker
for w := 0; w < workerCount; w++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for idx := range taskChan {
if isStopped() {
return
}
// 检查是否处于暂停状态,等待恢复
pauseMu.Lock()
for paused {
pauseCond.Wait()
}
pauseMu.Unlock()
result := processSingleTeam(idx, req)
resultChan <- result
atomic.AddInt32(&teamProcessState.Completed, 1)
}
}(w)
}
// 发送任务
go func() {
for i := 0; i < totalOwners; i++ {
select {
case <-teamProcessState.stopCh:
break
case taskChan <- i:
}
if isStopped() {
break
}
}
close(taskChan)
}()
// 等待完成并收集结果
go func() {
wg.Wait()
close(resultChan)
}()
// 连续入库为0的计数器
consecutiveZeroS2A := 0
// 收集结果并统计
for result := range resultChan {
teamProcessState.mu.Lock()
teamProcessState.Results = append(teamProcessState.Results, result)
teamProcessState.mu.Unlock()
totalRegistered += result.Registered
totalAddedToS2A += result.AddedToS2A
allErrors = append(allErrors, result.Errors...)
// 检测连续入库为0的情况
if result.AddedToS2A == 0 {
consecutiveZeroS2A++
if consecutiveZeroS2A >= 2 {
logger.Warning(fmt.Sprintf("⚠ 连续 %d 个 Team 入库数为 0 (最近: Team %d - %s),暂停 1 分钟后继续...",
consecutiveZeroS2A, result.TeamIndex, result.OwnerEmail), "", "team")
// 设置暂停标志,阻止 worker 取新任务
pauseMu.Lock()
paused = true
pauseMu.Unlock()
// 等待 1 分钟(可被停止信号中断)
select {
case <-time.After(1 * time.Minute):
logger.Info("暂停结束,恢复批量处理...", "", "team")
case <-teamProcessState.stopCh:
logger.Warning("暂停期间收到停止信号,终止处理", "", "team")
}
// 恢复 worker
pauseMu.Lock()
paused = false
pauseMu.Unlock()
pauseCond.Broadcast()
// 重置计数器
consecutiveZeroS2A = 0
}
} else {
// 入库不为0重置连续计数
consecutiveZeroS2A = 0
}
// 保存单个Team结果到数据库
if database.Instance != nil && batchID > 0 {
dbResult := database.BatchTeamResult{
TeamIndex: result.TeamIndex,
OwnerEmail: result.OwnerEmail,
TeamID: result.TeamID,
Registered: result.Registered,
AddedToS2A: result.AddedToS2A,
MemberEmails: result.MemberEmails,
Errors: result.Errors,
DurationMs: result.DurationMs,
}
if err := database.Instance.SaveBatchTeamResult(batchID, dbResult); err != nil {
logger.Error(fmt.Sprintf("保存Team结果失败: %v", err), result.OwnerEmail, "team")
}
}
}
// 计算成功率
expectedTotal := totalOwners * req.MembersPerTeam
successRate := float64(0)
if expectedTotal > 0 {
successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100
}
logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%",
teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team")
}
// processSingleTeam 处理单个 Team
func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) {
startTime := time.Now()
owner := req.Owners[idx]
result = TeamProcessResult{
TeamIndex: idx + 1,
OwnerEmail: owner.Email,
MemberEmails: make([]string, 0),
Errors: make([]string, 0),
}
// 固定宽度的 Team 编号 (支持到 Team 99)
logPrefix := fmt.Sprintf("[Team %2d]", idx+1)
// panic 恢复,防止单个 Team 处理崩溃影响整个任务
defer func() {
if r := recover(); r != nil {
logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team")
result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r))
result.DurationMs = time.Since(startTime).Milliseconds()
// 恢复为 valid 允许重试
if database.Instance != nil {
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}()
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
// 标记 owner 为处理中
if database.Instance != nil {
database.Instance.MarkOwnerAsProcessing(owner.Email)
}
// 处理失败时的清理函数
markOwnerResult := func(success bool) {
if database.Instance != nil {
if success {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除: %s", logPrefix, owner.Email), owner.Email, "team")
// 如果开启了母号降级,尝试降级
if config.Global != nil && config.Global.DemoteAfterUse {
go func() {
logger.Info(fmt.Sprintf("%s 尝试降级母号...", logPrefix), owner.Email, "team")
result := demote.DemoteOwner(demote.DemoteRequest{
AccessToken: owner.Token,
AccountID: owner.AccountID,
Role: "standard-user",
Proxy: req.Proxy,
})
if result.Success {
logger.Success(fmt.Sprintf("%s 母号已降级为普通成员", logPrefix), owner.Email, "team")
} else {
logger.Warning(fmt.Sprintf("%s 母号降级失败: %s", logPrefix, result.Error), owner.Email, "team")
}
}()
}
} else {
// 失败时恢复为 valid允许重试
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}
// Step 0: 预检查账户状态(封禁/Token过期检测
resolvedProxy := database.Instance.ResolveProxy(req.Proxy)
preChecker := invite.NewWithProxy(owner.Token, resolvedProxy)
accountStatus := preChecker.CheckAccountStatus()
if accountStatus.Status == "banned" {
logger.Error(fmt.Sprintf("%s 母号被封禁,跳过处理: %s", logPrefix, accountStatus.Error), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 封禁母号已删除: %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "账户被封禁")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
if accountStatus.Status == "token_expired" {
logger.Error(fmt.Sprintf("%s Token已过期跳过处理", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s Token过期母号已删除: %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "Token已过期")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
if accountStatus.Status == "error" {
logger.Warning(fmt.Sprintf("%s 账户状态检查失败: %s继续尝试", logPrefix, accountStatus.Error), owner.Email, "team")
} else if accountStatus.PlanType == "free" {
logger.Warning(fmt.Sprintf("%s 母号 plan 为 free非 Team 账户,移除", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s free 母号已删除: %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "账户 plan 为 free非 Team 账户")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
} else {
logger.Info(fmt.Sprintf("%s 账户状态正常: %s", logPrefix, accountStatus.PlanType), owner.Email, "team")
}
// Step 1: 获取 Team ID优先使用已存储的 account_id
var teamID string
inviter := invite.NewWithProxy(owner.Token, resolvedProxy)
if owner.AccountID != "" {
// 直接使用数据库中存储的 account_id
teamID = owner.AccountID
inviter.SetAccountID(teamID) // 必须设置到 inviter 中
logger.Info(fmt.Sprintf("%s 使用已存储的 Team ID: %s", logPrefix, teamID), owner.Email, "team")
} else {
// 如果没有存储,才请求 API 获取
var err error
teamID, err = inviter.GetAccountID()
if err != nil {
errStr := err.Error()
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
// Token 过期或无效,标记为 invalid 不再重试
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") ||
strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") {
logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号无效已删除: %s", logPrefix, owner.Email), owner.Email, "team")
}
return result
}
markOwnerResult(false)
return result
}
logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team")
}
result.TeamID = teamID
// Step 2: 测试邀请功能(检测 Team 是否被封禁或已满)
testEmail := mail.GenerateEmail()
if err := inviter.SendInvites([]string{testEmail}); err != nil {
errStr := err.Error()
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除(邀请已满): %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 检测 Team 被封禁
if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") ||
strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") ||
strings.Contains(errStr, "deactivated") {
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号无效已删除(Team被封禁): %s", logPrefix, owner.Email), owner.Email, "team")
}
result.Errors = append(result.Errors, "Team 被封禁")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 其他邀请错误,继续尝试
logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team")
}
// Step 3: 流水线模式 - 注册成功的成员立即开始入库
// 每个成员:邀请 → 注册 → 入库失败重试1次
// Team 有4次额外补救机会
type MemberAccount struct {
Email string
Password string
Success bool
S2ADone bool // 入库是否完成
S2AOK bool // 入库是否成功
}
children := make([]MemberAccount, req.MembersPerTeam)
var memberMu sync.Mutex
// 共享标志Team 邀请已满,所有 goroutine 应停止
var teamExhausted int32
// 入库计数器
var s2aSuccessCount int32
var s2aFailCount int32
// Team 级别 500 错误熔断器:当同 Team 多个成员连续遇到 500 时快速失败
var consecutive500Fails int32
// 入库并发控制信号量
s2aSem := make(chan struct{}, req.ConcurrentS2A)
// 检查 Team 是否已满的辅助函数
isTeamExhausted := func() bool {
return atomic.LoadInt32(&teamExhausted) == 1
}
// 标记 Team 已满
markTeamExhausted := func() {
if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
logger.Info(fmt.Sprintf("%s 母号已使用并删除(Team耗尽): %s", logPrefix, owner.Email), owner.Email, "team")
}
}
}
// 入库单个成员的函数
doS2A := func(memberIdx int, memberEmail, memberPassword string) bool {
memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1)
memberStartTime := time.Now()
// 检查停止信号
if isStopped() {
return false
}
// 获取入库信号量3分钟超时可被停止信号中断
select {
case s2aSem <- struct{}{}:
case <-teamProcessState.stopCh:
logger.Warning(fmt.Sprintf("%s 收到停止信号,跳过入库", memberLogPrefix), memberEmail, "team")
return false
case <-time.After(3 * time.Minute):
logger.Warning(fmt.Sprintf("%s 入库信号量等待超时 (3分钟)", memberLogPrefix), memberEmail, "team")
atomic.AddInt32(&s2aFailCount, 1)
memberMu.Lock()
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库信号量超时", memberIdx+1))
memberMu.Unlock()
return false
}
defer func() { <-s2aSem }()
var s2aSuccess bool
var lastError string
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
// 检查停止信号
if isStopped() {
return false
}
// 熔断检查:同 Team 已有 3+ 个成员连续 500 失败,快速跳过
if atomic.LoadInt32(&consecutive500Fails) >= 3 {
logger.Warning(fmt.Sprintf("%s 同 Team 已有多个成员连续 500 失败,跳过入库", memberLogPrefix), memberEmail, "team")
atomic.AddInt32(&s2aFailCount, 1)
memberMu.Lock()
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库跳过: 服务器持续 500 (熔断)", memberIdx+1))
memberMu.Unlock()
return false
}
if attempt > 0 {
// 重试前短暂等待(可被停止信号中断)
retryDelay := time.Duration(3) * time.Second
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team")
select {
case <-time.After(retryDelay):
case <-teamProcessState.stopCh:
return false
}
}
// 每次尝试都从代理池获取新代理(避免复用失败代理)
proxyToUse, _ := database.Instance.GetRandomCodexProxy()
proxyDisplay := "直连"
if proxyToUse != "" {
proxyDisplay = getProxyDisplay(proxyToUse)
}
if attempt == 0 {
logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team")
} else {
logger.Status(fmt.Sprintf("%s 入库重试中... | 代理: %s", memberLogPrefix, proxyDisplay), memberEmail, "team")
}
// 创建日志回调 - 只显示关键步骤(去重:每个步骤只输出一次)
loggedSteps := make(map[auth.AuthStep]bool)
authLogger := auth.NewAuthLogger(memberEmail, logPrefix, memberIdx+1, func(entry auth.AuthLogEntry) {
if entry.IsError {
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, entry.Message), memberEmail, "team")
} else {
// 只显示关键步骤,且每个步骤只输出首次(避免重试时重复日志)
switch entry.Step {
case auth.StepInputEmail:
if !loggedSteps[entry.Step] {
loggedSteps[entry.Step] = true
logger.Info(fmt.Sprintf("%s 提交邮箱: %s", memberLogPrefix, memberEmail), memberEmail, "team")
}
case auth.StepInputPassword:
if !loggedSteps[entry.Step] {
loggedSteps[entry.Step] = true
logger.Info(fmt.Sprintf("%s 验证密码...", memberLogPrefix), memberEmail, "team")
}
case auth.StepSelectWorkspace:
if !loggedSteps[entry.Step] {
loggedSteps[entry.Step] = true
logger.Info(fmt.Sprintf("%s 选择工作区: %s", memberLogPrefix, teamID), memberEmail, "team")
}
case auth.StepComplete:
logger.Info(fmt.Sprintf("%s 授权成功,获取到授权码", memberLogPrefix), memberEmail, "team")
}
}
})
// 获取授权 URL
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
lastError = fmt.Sprintf("获取授权URL失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team")
continue
}
// 根据配置选择授权方式
var code string
if config.Global.AuthMethod == "api" {
code, err = auth.CompleteWithCodexAPI(memberEmail, memberPassword, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, proxyToUse, authLogger)
if proxyToUse != "" {
database.Instance.UpdateCodexProxyStats(proxyToUse, err == nil)
}
} else {
code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, memberEmail, memberPassword, teamID, req.Headless, req.Proxy, authLogger)
}
if err != nil {
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team")
// 跟踪 500 错误用于熔断
if strings.Contains(err.Error(), "500") || strings.Contains(err.Error(), "重试已耗尽") {
atomic.AddInt32(&consecutive500Fails, 1)
}
continue
}
// 授权成功,重置 500 计数
atomic.StoreInt32(&consecutive500Fails, 0)
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
"team-"+memberEmail,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
lastError = fmt.Sprintf("S2A提交失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team")
continue
}
s2aSuccess = true
memberDuration := time.Since(memberStartTime)
logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", memberLogPrefix, memberDuration.Seconds()), memberEmail, "team")
break
}
if s2aSuccess {
atomic.AddInt32(&s2aSuccessCount, 1)
} else {
atomic.AddInt32(&s2aFailCount, 1)
memberMu.Lock()
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败: %s", memberIdx+1, lastError))
memberMu.Unlock()
}
return s2aSuccess
}
// 注册并入库单个成员的函数带1次重试- 流水线模式
var s2aWg sync.WaitGroup
registerAndS2AMember := func(memberIdx int, email, password string) bool {
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1)
regStartTime := time.Now()
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
// 检查是否应该停止
if isStopped() || isTeamExhausted() {
return false
}
currentEmail := email
currentPassword := password
if attempt > 0 {
// 注册失败重试:保持原邮箱(邀请已发送),仅重新注册
logger.Warning(fmt.Sprintf("%s 注册重试 (第%d次), 保持邮箱: %s", memberLogPrefix, attempt+1, currentEmail), currentEmail, "team")
}
// 首次尝试时发送邀请,重试时跳过(邀请已发送到该邮箱)
if attempt == 0 {
if err := inviter.SendInvites([]string{currentEmail}); err != nil {
errStr := err.Error()
logger.Error(fmt.Sprintf("%s 邀请失败: %v", memberLogPrefix, err), currentEmail, "team")
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
markTeamExhausted()
return false
}
// 邀请失败时换新邮箱重试
email = mail.GenerateEmail()
password = register.GeneratePassword()
currentEmail = email
currentPassword = password
logger.Warning(fmt.Sprintf("%s 邀请失败,换新邮箱: %s", memberLogPrefix, currentEmail), currentEmail, "team")
continue
}
}
// 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记)
if isTeamExhausted() {
return false
}
// 注册 - 从代理池获取随机代理
regProxy := req.Proxy
if database.Instance != nil {
if poolProxy, poolErr := database.Instance.GetRandomCodexProxy(); poolErr == nil && poolProxy != "" {
regProxy = poolProxy
}
}
regProxyDisplay := "直连"
if regProxy != "" {
regProxyDisplay = getProxyDisplay(regProxy)
}
logger.Info(fmt.Sprintf("%s Email: %s | Password: %s | Proxy: %s", memberLogPrefix, currentEmail, currentPassword, regProxyDisplay), currentEmail, "team")
_, err := register.APIRegister(currentEmail, currentPassword, name, birthdate, regProxy, memberLogPrefix)
if err != nil {
logger.Error(fmt.Sprintf("%s [注册失败] %v", memberLogPrefix, err), currentEmail, "team")
continue
}
// 注册成功
regDuration := time.Since(regStartTime)
memberMu.Lock()
children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true}
result.MemberEmails = append(result.MemberEmails, currentEmail)
result.Registered++
memberMu.Unlock()
logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team")
// 流水线:注册成功后等待再开始入库(避免触发邮箱验证 + 成员间错开避免并发冲突)
s2aWg.Add(1)
go func(idx int, e, p string) {
defer s2aWg.Done()
// 基础 1 秒 + 成员索引 * 1 秒错开 + 0~1 秒随机抖动
stagger := 1*time.Second + time.Duration(idx)*time.Second + time.Duration(rand.Intn(1000))*time.Millisecond
select {
case <-time.After(stagger):
case <-teamProcessState.stopCh:
return
}
success := doS2A(idx, e, p)
memberMu.Lock()
children[idx].S2ADone = true
children[idx].S2AOK = success
memberMu.Unlock()
}(memberIdx, currentEmail, currentPassword)
return true
}
return false
}
// 第一轮:并发注册成员(注册成功后立即入库)
logger.Info(fmt.Sprintf("%s ════════ 开始流水线处理 ════════ 目标: %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team")
pipelineStartTime := time.Now()
var regWg sync.WaitGroup
for i := 0; i < req.MembersPerTeam; i++ {
regWg.Add(1)
go func(idx int) {
defer regWg.Done()
// 检查是否应该停止
if isStopped() || isTeamExhausted() {
return
}
email := mail.GenerateEmail()
password := register.GeneratePassword()
registerAndS2AMember(idx, email, password)
}(i)
}
if !waitGroupWithTimeout(&regWg, 8*time.Minute) {
logger.Warning(fmt.Sprintf("%s 注册阶段超时 (8分钟),继续处理已完成的成员", logPrefix), owner.Email, "team")
}
// 如果 Team 已满,等待已启动的入库完成
if isTeamExhausted() {
if !waitGroupWithTimeout(&s2aWg, 5*time.Minute) {
logger.Warning(fmt.Sprintf("%s 入库等待超时 (5分钟)", logPrefix), owner.Email, "team")
}
result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount))
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
markOwnerResult(result.AddedToS2A > 0)
return result
}
// 统计失败的成员
failedSlots := make([]int, 0)
for i, c := range children {
if !c.Success {
failedSlots = append(failedSlots, i)
}
}
// 第二轮Team 有 4 次额外补救机会(如果 Team 未满)
teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && !isStopped() && !isTeamExhausted(); retry++ {
slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
email := mail.GenerateEmail()
password := register.GeneratePassword()
if registerAndS2AMember(slotIdx, email, password) {
failedSlots = failedSlots[1:] // 成功,移除这个槽位
}
}
// 等待所有入库完成
if !waitGroupWithTimeout(&s2aWg, 10*time.Minute) {
logger.Warning(fmt.Sprintf("%s 入库阶段超时 (10分钟),继续统计结果", logPrefix), owner.Email, "team")
}
// 补救后再次检查 Team 是否已满
if isTeamExhausted() {
result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount))
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
markOwnerResult(result.AddedToS2A > 0)
return result
}
// 统计最终结果
if len(failedSlots) > 0 {
result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots)))
}
result.AddedToS2A = int(atomic.LoadInt32(&s2aSuccessCount))
pipelineDuration := time.Since(pipelineStartTime)
logger.Info(fmt.Sprintf("%s ════════ 流水线完成 ════════ 注册: %d/%d, 入库: %d, 耗时: %.1fs",
logPrefix, result.Registered, req.MembersPerTeam, result.AddedToS2A, pipelineDuration.Seconds()), owner.Email, "team")
// 如果没有任何成员注册成功,跳过母号入库
if result.Registered == 0 {
logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过母号入库", logPrefix), owner.Email, "team")
result.DurationMs = time.Since(startTime).Milliseconds()
markOwnerResult(false)
return result
}
// Step 4: 母号也入库(如果开启)- 带重试
if req.IncludeOwner && !isStopped() {
ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix)
ownerStartTime := time.Now()
logger.Status(fmt.Sprintf("%s 母号入库中...", ownerLogPrefix), owner.Email, "team")
var ownerSuccess bool
var lastError string
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
// 检查停止信号
if isStopped() {
break
}
if attempt > 0 {
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team")
}
// 创建日志回调(输出关键日志和调试信息)
authLogger := auth.NewAuthLogger(owner.Email, logPrefix, 0, func(entry auth.AuthLogEntry) {
if entry.IsError {
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team")
} else {
switch entry.Step {
case auth.StepNavigate, auth.StepInputEmail, auth.StepInputPassword,
auth.StepComplete, auth.StepConsent, auth.StepSelectWorkspace:
logger.Info(fmt.Sprintf("%s %s", ownerLogPrefix, entry.Message), owner.Email, "team")
}
}
})
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
lastError = fmt.Sprintf("获取授权URL失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
continue
}
var code string
// 根据全局配置决定授权方式
if config.Global.AuthMethod == "api" {
// 使用纯 API 模式CodexAuth- 使用 S2A 生成的授权 URL
// 从代理池随机选择代理
proxyToUse := req.Proxy
if poolProxy, poolErr := database.Instance.GetRandomCodexProxy(); poolErr == nil && poolProxy != "" {
proxyToUse = poolProxy
logger.Info(fmt.Sprintf("%s 使用代理池: %s", ownerLogPrefix, getProxyDisplay(poolProxy)), owner.Email, "team")
}
code, err = auth.CompleteWithCodexAPI(owner.Email, owner.Password, teamID, s2aResp.Data.AuthURL, s2aResp.Data.SessionID, proxyToUse, authLogger)
// 更新代理统计
if proxyToUse != req.Proxy && proxyToUse != "" {
database.Instance.UpdateCodexProxyStats(proxyToUse, err == nil)
}
} else {
// 使用 Chromedp 浏览器自动化
code, err = auth.CompleteWithChromedpLogged(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy, authLogger)
}
if err != nil {
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
continue
}
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
"team-"+owner.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
lastError = fmt.Sprintf("S2A提交失败: %v", err)
logger.Error(fmt.Sprintf("%s %s", ownerLogPrefix, lastError), owner.Email, "team")
continue
}
ownerSuccess = true
result.AddedToS2A++
ownerDuration := time.Since(ownerStartTime)
logger.Success(fmt.Sprintf("%s ✓ 入库成功 (总耗时: %.1fs)", ownerLogPrefix, ownerDuration.Seconds()), owner.Email, "team")
break
}
if !ownerSuccess {
result.Errors = append(result.Errors, fmt.Sprintf("母号入库失败: %s", lastError))
}
}
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team")
// 根据入库结果标记 owner 状态
// 只要有任何一个账号入库成功,就标记为已使用
markOwnerResult(result.AddedToS2A > 0)
return result
}