@@ -539,21 +539,29 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
logger . Warning ( fmt . Sprintf ( "%s 首次邀请失败,继续尝试: %v" , logPrefix , err ) , owner . Email , "team" )
}
// Step 3: 并发注册成员
// 每个成员:邀请 → 注册, 失败重试1次
// Step 3: 流水线模式 - 注册成功的成员立即开始入库
// 每个成员:邀请 → 注册 → 入库 , 失败重试1次
// Team 有4次额外补救机会
type MemberAccount struct {
Email string
Password string
Success bool
S2ADone bool // 入库是否完成
S2AOK bool // 入库是否成功
}
children := make ( [ ] MemberAccount , req . MembersPerTeam )
var memberMu sync . Mutex
var memberWg sync . WaitGroup
// 共享标志: Team 邀请已满,所有 goroutine 应停止
var teamExhausted int32
// 入库计数器
var s2aSuccessCount int32
var s2aFailCount int32
// 入库并发控制信号量
s2aSem := make ( chan struct { } , req . ConcurrentS2A )
// 检查 Team 是否已满的辅助函数
isTeamExhausted := func ( ) bool {
return atomic . LoadInt32 ( & teamExhausted ) == 1
@@ -571,8 +579,106 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
}
}
// 注册 单个成员的函数( 带1次重试)
registerMember := func ( memberIdx int , e mail, p assword string ) bool {
// 入库 单个成员的函数
doS2A := func ( memberIdx int , memberE mail, memberP assword string ) bool {
memberLogPrefix := fmt . Sprintf ( "%s [成员 %d]" , logPrefix , memberIdx + 1 )
memberStartTime := time . Now ( )
// 获取入库信号量
s2aSem <- struct { } { }
defer func ( ) { <- s2aSem } ( )
logger . Status ( fmt . Sprintf ( "%s 入库中... | 邮箱: %s" , memberLogPrefix , memberEmail ) , memberEmail , "team" )
var s2aSuccess bool
var lastError string
for attempt := 0 ; attempt < 2 ; attempt ++ { // 最多重试1次
if attempt > 0 {
logger . Warning ( fmt . Sprintf ( "%s 入库重试 (第%d次)" , memberLogPrefix , attempt + 1 ) , memberEmail , "team" )
}
// 创建日志回调
authLogger := auth . NewAuthLogger ( memberEmail , logPrefix , memberIdx + 1 , func ( entry auth . AuthLogEntry ) {
if entry . IsError {
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , entry . Message ) , memberEmail , "team" )
} else {
switch entry . Step {
case auth . StepNavigate , auth . StepInputEmail , auth . StepInputPassword ,
auth . StepComplete , auth . StepConsent , auth . StepSelectWorkspace :
logger . Info ( fmt . Sprintf ( "%s %s" , memberLogPrefix , entry . Message ) , memberEmail , "team" )
}
}
} )
// 获取授权 URL
s2aResp , err := auth . GenerateS2AAuthURL ( config . Global . S2AApiBase , config . Global . S2AAdminKey , config . Global . ProxyID )
if err != nil {
lastError = fmt . Sprintf ( "获取授权URL失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberEmail , "team" )
continue
}
// 根据配置选择授权方式
var code string
if config . Global . AuthMethod == "api" {
proxyToUse := req . Proxy
if poolProxy , poolErr := database . Instance . GetRandomCodexProxy ( ) ; poolErr == nil && poolProxy != "" {
proxyToUse = poolProxy
logger . Info ( fmt . Sprintf ( "%s 使用代理池: %s" , memberLogPrefix , getProxyDisplay ( poolProxy ) ) , memberEmail , "team" )
}
code , err = auth . CompleteWithCodexAPI ( memberEmail , memberPassword , teamID , s2aResp . Data . AuthURL , s2aResp . Data . SessionID , proxyToUse , authLogger )
if proxyToUse != req . Proxy && proxyToUse != "" {
database . Instance . UpdateCodexProxyStats ( proxyToUse , err == nil )
}
} else {
code , err = auth . CompleteWithChromedpLogged ( s2aResp . Data . AuthURL , memberEmail , memberPassword , teamID , req . Headless , req . Proxy , authLogger )
}
if err != nil {
lastError = fmt . Sprintf ( "浏览器授权失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberEmail , "team" )
continue
}
// 提交到 S2A
_ , err = auth . SubmitS2AOAuth (
config . Global . S2AApiBase ,
config . Global . S2AAdminKey ,
s2aResp . Data . SessionID ,
code ,
memberEmail ,
config . Global . Concurrency ,
config . Global . Priority ,
config . Global . GroupIDs ,
config . Global . ProxyID ,
)
if err != nil {
lastError = fmt . Sprintf ( "S2A提交失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberEmail , "team" )
continue
}
s2aSuccess = true
memberDuration := time . Since ( memberStartTime )
logger . Success ( fmt . Sprintf ( "%s ✓ 入库成功 (总耗时: %.1fs)" , memberLogPrefix , memberDuration . Seconds ( ) ) , memberEmail , "team" )
break
}
if s2aSuccess {
atomic . AddInt32 ( & s2aSuccessCount , 1 )
} else {
atomic . AddInt32 ( & s2aFailCount , 1 )
memberMu . Lock ( )
result . Errors = append ( result . Errors , fmt . Sprintf ( "成员 %d 入库失败: %s" , memberIdx + 1 , lastError ) )
memberMu . Unlock ( )
}
return s2aSuccess
}
// 注册并入库单个成员的函数( 带1次重试) - 流水线模式
var s2aWg sync . WaitGroup
registerAndS2AMember := func ( memberIdx int , email , password string ) bool {
name := register . GenerateName ( )
birthdate := register . GenerateBirthdate ( )
memberLogPrefix := fmt . Sprintf ( "%s [成员 %d]" , logPrefix , memberIdx + 1 )
@@ -618,24 +724,40 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
continue
}
// 成功
// 注册 成功
regDuration := time . Since ( regStartTime )
memberMu . Lock ( )
children [ memberIdx ] = MemberAccount { Email : currentEmail , Password : currentPassword , Success : true }
result . MemberEmails = append ( result . MemberEmails , currentEmail )
result . Registered ++
memberMu . Unlock ( )
logger . Success ( fmt . Sprintf ( "%s ✓ 注册成功 (耗时: %.1fs)" , memberLogPrefix , regDuration . Seconds ( ) ) , currentEmail , "team" )
// 流水线:注册成功后立即启动入库(异步)
s2aWg . Add ( 1 )
go func ( idx int , e , p string ) {
defer s2aWg . Done ( )
success := doS2A ( idx , e , p )
memberMu . Lock ( )
children [ idx ] . S2ADone = true
children [ idx ] . S2AOK = success
memberMu . Unlock ( )
} ( memberIdx , currentEmail , currentPassword )
return true
}
return false
}
// 第一轮:并发注册4个 成员
logger . Info ( fmt . Sprintf ( "%s ════════ 开始注册阶段 ════════ 目标: %d 个成员" , logPrefix , req . MembersPerTeam ) , owner . Email , "team" )
regPhas eStartTime := time . Now ( )
// 第一轮:并发注册成员(注册成功后立即入库)
logger . Info ( fmt . Sprintf ( "%s ════════ 开始流水线处理 ════════ 目标: %d 个成员" , logPrefix , req . MembersPerTeam ) , owner . Email , "team" )
pipelin eStartTime := time . Now ( )
var regWg sync . WaitGroup
for i := 0 ; i < req . MembersPerTeam ; i ++ {
member Wg. Add ( 1 )
reg Wg. Add ( 1 )
go func ( idx int ) {
defer member Wg. Done ( )
defer reg Wg. Done ( )
// 检查是否应该停止
if isTeamExhausted ( ) {
return
@@ -643,13 +765,15 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
email := mail . GenerateEmail ( )
password := register . GeneratePassword ( )
logger . Info ( fmt . Sprintf ( "%s [成员 %d] 邮箱: %s | 密码: %s" , logPrefix , idx + 1 , email , password ) , email , "team" )
registerMember ( idx , email , password )
registerAndS2A Member ( idx , email , password )
} ( i )
}
member Wg. Wait ( )
reg Wg. Wait ( )
// 如果 Team 已满,直接跳过补救和后续处理
// 如果 Team 已满,等待已启动的入库完成
if isTeamExhausted ( ) {
s2aWg . Wait ( )
result . AddedToS2A = int ( atomic . LoadInt32 ( & s2aSuccessCount ) )
result . Errors = append ( result . Errors , "Team 邀请已满" )
result . DurationMs = time . Since ( startTime ) . Milliseconds ( )
return result
@@ -671,184 +795,41 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
email := mail . GenerateEmail ( )
password := register . GeneratePassword ( )
if registerMember ( slotIdx , email , password ) {
if registerAndS2A Member ( slotIdx , email , password ) {
failedSlots = failedSlots [ 1 : ] // 成功,移除这个槽位
}
}
// 等待所有入库完成
s2aWg . Wait ( )
// 补救后再次检查 Team 是否已满
if isTeamExhausted ( ) {
result . AddedToS2A = int ( atomic . LoadInt32 ( & s2aSuccessCount ) )
result . Errors = append ( result . Errors , "Team 邀请已满" )
result . DurationMs = time . Since ( startTime ) . Milliseconds ( )
return result
}
// 统计注册成功数
registeredChildren := make ( [ ] MemberAccount , 0 )
for _ , c := range children {
if c . Success {
registeredChildren = append ( registeredChildren , c )
result . MemberEmails = append ( result . MemberEmails , c . Email )
result . Registered ++
}
}
// 统计最终结果
if len ( failedSlots ) > 0 {
result . Errors = append ( result . Errors , fmt . Sprintf ( "%d 个成员注册失败" , len ( failedSlots ) ) )
}
regPhaseDuration := time . Since ( regPhaseStartTime )
logger . Info ( fmt . Sprintf ( "%s ════════ 注册阶段完成 ════════ 成功: %d/%d, 耗时: %.1fs" , logPrefix , result . Registered , req . MembersPerTeam , regPhaseDuration . Seconds ( ) ) , owner . Email , "team" )
// 如果没有任何成员注册成功,跳过入库步骤
if len ( registeredChildren ) == 0 {
logger . Warning ( fmt . Sprintf ( "%s 没有成员注册成功,跳过入库步骤" , logPrefix ) , owner . Email , "team" )
result . AddedToS2A = int ( atomic . LoadInt32 ( & s2aSuccessCount ) )
pipelineDuration := time . Since ( pipelineStartTime )
logger . Info ( fmt . Sprintf ( "%s ════════ 流水线完成 ════════ 注册: %d/%d, 入库: %d, 耗时: %.1fs" ,
logPrefix , result . Registered , req . MembersPerTeam , result . AddedToS2A , pipelineDuration . Seconds ( ) ) , owner . Email , "team" )
// 如果没有任何成员注册成功,跳过母号入库
if result . Registered == 0 {
logger . Warning ( fmt . Sprintf ( "%s 没有成员注册成功,跳过母号入库" , logPrefix ) , owner . Email , "team" )
result . DurationMs = time . Since ( startTime ) . Milliseconds ( )
markOwnerResult ( false )
return result
}
// Step 4: S2A 授权入库(成员)- 并发入库
logger . Info ( fmt . Sprintf ( "%s ════════ 开始入库阶段 ════════ 共 %d 个成员, 并发数: %d" , logPrefix , len ( registeredChildren ) , req . ConcurrentS2A ) , owner . Email , "team" )
s2aStartTime := time . Now ( )
// 入库结果
type S2AResult struct {
Index int
Email string
Success bool
Error string
}
s2aResults := make ( chan S2AResult , len ( registeredChildren ) )
s2aSem := make ( chan struct { } , req . ConcurrentS2A ) // 并发控制信号量
var s2aWg sync . WaitGroup
for i , child := range registeredChildren {
if ! teamProcessState . Running {
break
}
s2aWg . Add ( 1 )
go func ( memberIdx int , memberChild MemberAccount ) {
defer s2aWg . Done ( )
// 获取信号量
s2aSem <- struct { } { }
defer func ( ) { <- s2aSem } ( )
memberStartTime := time . Now ( )
memberLogPrefix := fmt . Sprintf ( "%s [成员 %d]" , logPrefix , memberIdx + 1 )
logger . Status ( fmt . Sprintf ( "%s 入库中... | 邮箱: %s" , memberLogPrefix , memberChild . Email ) , memberChild . Email , "team" )
var s2aSuccess bool
var lastError string
for attempt := 0 ; attempt < 2 ; attempt ++ { // 最多重试1次
if attempt > 0 {
logger . Warning ( fmt . Sprintf ( "%s 入库重试 (第%d次)" , memberLogPrefix , attempt + 1 ) , memberChild . Email , "team" )
}
// 创建日志回调(输出关键日志和调试信息)
authLogger := auth . NewAuthLogger ( memberChild . Email , logPrefix , memberIdx + 1 , func ( entry auth . AuthLogEntry ) {
if entry . IsError {
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , entry . Message ) , memberChild . Email , "team" )
} else {
// 输出关键步骤:导航、输入、完成等
switch entry . Step {
case auth . StepNavigate , auth . StepInputEmail , auth . StepInputPassword ,
auth . StepComplete , auth . StepConsent , auth . StepSelectWorkspace :
logger . Info ( fmt . Sprintf ( "%s %s" , memberLogPrefix , entry . Message ) , memberChild . Email , "team" )
}
}
} )
// 获取授权 URL
s2aResp , err := auth . GenerateS2AAuthURL ( config . Global . S2AApiBase , config . Global . S2AAdminKey , config . Global . ProxyID )
if err != nil {
lastError = fmt . Sprintf ( "获取授权URL失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberChild . Email , "team" )
continue
}
// 根据配置选择授权方式
var code string
if config . Global . AuthMethod == "api" {
// 使用纯 API 模式( CodexAuth) - 使用 S2A 生成的授权 URL
// 从代理池随机选择代理
proxyToUse := req . Proxy
if poolProxy , poolErr := database . Instance . GetRandomCodexProxy ( ) ; poolErr == nil && poolProxy != "" {
proxyToUse = poolProxy
logger . Info ( fmt . Sprintf ( "%s 使用代理池: %s" , memberLogPrefix , getProxyDisplay ( poolProxy ) ) , memberChild . Email , "team" )
}
code , err = auth . CompleteWithCodexAPI ( memberChild . Email , memberChild . Password , teamID , s2aResp . Data . AuthURL , s2aResp . Data . SessionID , proxyToUse , authLogger )
// 更新代理统计
if proxyToUse != req . Proxy && proxyToUse != "" {
database . Instance . UpdateCodexProxyStats ( proxyToUse , err == nil )
}
} else {
// 使用 Chromedp 浏览器自动化
code , err = auth . CompleteWithChromedpLogged ( s2aResp . Data . AuthURL , memberChild . Email , memberChild . Password , teamID , req . Headless , req . Proxy , authLogger )
}
if err != nil {
lastError = fmt . Sprintf ( "浏览器授权失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberChild . Email , "team" )
continue
}
// 提交到 S2A
_ , err = auth . SubmitS2AOAuth (
config . Global . S2AApiBase ,
config . Global . S2AAdminKey ,
s2aResp . Data . SessionID ,
code ,
memberChild . Email ,
config . Global . Concurrency ,
config . Global . Priority ,
config . Global . GroupIDs ,
config . Global . ProxyID ,
)
if err != nil {
lastError = fmt . Sprintf ( "S2A提交失败: %v" , err )
logger . Error ( fmt . Sprintf ( "%s %s" , memberLogPrefix , lastError ) , memberChild . Email , "team" )
continue
}
s2aSuccess = true
memberDuration := time . Since ( memberStartTime )
logger . Success ( fmt . Sprintf ( "%s ✓ 入库成功 (总耗时: %.1fs)" , memberLogPrefix , memberDuration . Seconds ( ) ) , memberChild . Email , "team" )
break
}
s2aResults <- S2AResult {
Index : memberIdx ,
Email : memberChild . Email ,
Success : s2aSuccess ,
Error : lastError ,
}
} ( i , child )
}
// 等待所有入库完成
go func ( ) {
s2aWg . Wait ( )
close ( s2aResults )
} ( )
// 收集入库结果
for s2aRes := range s2aResults {
if s2aRes . Success {
result . AddedToS2A ++
} else {
result . Errors = append ( result . Errors , fmt . Sprintf ( "成员 %d 入库失败: %s" , s2aRes . Index + 1 , s2aRes . Error ) )
}
}
s2aDuration := time . Since ( s2aStartTime )
logger . Info ( fmt . Sprintf ( "%s ════════ 入库阶段完成 ════════ 成功: %d/%d, 耗时: %.1fs" , logPrefix , result . AddedToS2A , len ( registeredChildren ) , s2aDuration . Seconds ( ) ) , owner . Email , "team" )
// Step 5: 母号也入库(如果开启)- 带重试
// Step 4: 母号也入库(如果开启)- 带重试
if req . IncludeOwner && teamProcessState . Running {
ownerLogPrefix := fmt . Sprintf ( "%s [母号 ]" , logPrefix )
ownerStartTime := time . Now ( )