feat: Implement S2A account cleaner management and a global application configuration context, with supporting backend API.

This commit is contained in:
2026-01-31 02:24:08 +08:00
parent 92383f2f20
commit 634b493524
4 changed files with 251 additions and 91 deletions

View File

@@ -286,10 +286,10 @@ func runTeamProcess(req TeamProcessRequest) {
}
// processSingleTeam 处理单个 Team
func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) {
startTime := time.Now()
owner := req.Owners[idx]
result := TeamProcessResult{
result = TeamProcessResult{
TeamIndex: idx + 1,
OwnerEmail: owner.Email,
MemberEmails: make([]string, 0),
@@ -298,6 +298,20 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
// 固定宽度的 Team 编号 (支持到 Team 99)
logPrefix := fmt.Sprintf("[Team %2d]", idx+1)
// panic 恢复,防止单个 Team 处理崩溃影响整个任务
defer func() {
if r := recover(); r != nil {
logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team")
result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r))
result.DurationMs = time.Since(startTime).Milliseconds()
// 恢复为 valid 允许重试
if database.Instance != nil {
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}()
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
// 标记 owner 为处理中
@@ -331,9 +345,21 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
var err error
teamID, err = inviter.GetAccountID()
if err != nil {
errStr := err.Error()
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
// Token 过期或无效,标记为 invalid 不再重试
if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") ||
strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") {
logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
}
return result
}
markOwnerResult(false)
return result
}
@@ -341,15 +367,26 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
result.TeamID = teamID
// Step 2: 测试邀请功能(检测 Team 是否被封禁)
// Step 2: 测试邀请功能(检测 Team 是否被封禁或已满
testEmail := mail.GenerateEmail()
if err := inviter.SendInvites([]string{testEmail}); err != nil {
// 邀请失败,可能是 Team 被封禁
errStr := err.Error()
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
}
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 检测 Team 被封禁
if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") ||
strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") ||
strings.Contains(errStr, "deactivated") {
// Team 被封禁,标记为 invalid
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsInvalid(owner.Email)
@@ -358,6 +395,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 其他邀请错误,继续尝试
logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team")
}
@@ -374,13 +412,32 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
var memberMu sync.Mutex
var memberWg sync.WaitGroup
// 共享标志Team 邀请已满,所有 goroutine 应停止
var teamExhausted int32
// 检查 Team 是否已满的辅助函数
isTeamExhausted := func() bool {
return atomic.LoadInt32(&teamExhausted) == 1
}
// 标记 Team 已满
markTeamExhausted := func() {
if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
}
}
}
// 注册单个成员的函数带1次重试
registerMember := func(memberIdx int, email, password string) bool {
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
if !teamProcessState.Running {
// 检查是否应该停止
if !teamProcessState.Running || isTeamExhausted() {
return false
}
@@ -398,18 +455,19 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
errStr := err.Error()
logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
// 检测 Team 已达邀请上限
if strings.Contains(errStr, "maximum number of seats") {
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用", logPrefix), owner.Email, "team")
if database.Instance != nil {
database.Instance.MarkOwnerAsUsed(owner.Email)
}
// 跳出重试,该成员不再处理
// 检测 Team 已达邀请上限401 或 maximum number of seats
if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") {
markTeamExhausted()
return false
}
continue
}
// 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记)
if isTeamExhausted() {
return false
}
// 注册
_, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy)
if err != nil {
@@ -433,6 +491,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
memberWg.Add(1)
go func(idx int) {
defer memberWg.Done()
// 检查是否应该停止
if isTeamExhausted() {
return
}
email := mail.GenerateEmail()
password := register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team")
@@ -441,6 +503,13 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
memberWg.Wait()
// 如果 Team 已满,直接跳过补救和后续处理
if isTeamExhausted() {
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 统计失败的成员
failedSlots := make([]int, 0)
for i, c := range children {
@@ -449,9 +518,9 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
}
// 第二轮Team 有 4 次额外补救机会
// 第二轮Team 有 4 次额外补救机会(如果 Team 未满)
teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ {
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ {
slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
@@ -462,6 +531,13 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
}
// 补救后再次检查 Team 是否已满
if isTeamExhausted() {
result.Errors = append(result.Errors, "Team 邀请已满")
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
// 统计注册成功数
registeredChildren := make([]MemberAccount, 0)
for _, c := range children {
@@ -477,59 +553,88 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team")
// Step 4: S2A 授权入库(成员)
// 如果没有任何成员注册成功,跳过入库步骤
if len(registeredChildren) == 0 {
logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过入库步骤", logPrefix), owner.Email, "team")
result.DurationMs = time.Since(startTime).Milliseconds()
markOwnerResult(false)
return result
}
// Step 4: S2A 授权入库(成员)- 带重试
for i, child := range registeredChildren {
if !teamProcessState.Running {
break
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d auth URL: %v", i+1, err))
continue
var s2aSuccess bool
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
if attempt > 0 {
logger.Warning(fmt.Sprintf("%s [成员 %d] 入库重试...", logPrefix, i+1), child.Email, "team")
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 获取授权URL失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
// 根据配置选择浏览器自动化
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 浏览器授权失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
child.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] S2A提交失败: %v", logPrefix, i+1, err), child.Email, "team")
continue
}
s2aSuccess = true
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team")
break
}
// 根据配置选择浏览器自动化
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
} else {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy)
if !s2aSuccess {
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败", i+1))
}
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d browser: %v", i+1, err))
continue
}
// 提交到 S2A
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
child.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Member %d S2A: %v", i+1, err))
continue
}
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team")
}
// Step 5: 母号也入库(如果开启)
// Step 5: 母号也入库(如果开启)- 带重试
if req.IncludeOwner && teamProcessState.Running {
logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team")
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner auth URL: %v", err))
} else {
var ownerSuccess bool
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
if attempt > 0 {
logger.Warning(fmt.Sprintf("%s [母号] 入库重试...", logPrefix), owner.Email, "team")
}
s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID)
if err != nil {
logger.Error(fmt.Sprintf("%s [母号] 获取授权URL失败: %v", logPrefix, err), owner.Email, "team")
continue
}
var code string
if req.BrowserType == "rod" {
code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
@@ -537,26 +642,34 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy)
}
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner browser: %v", err))
} else {
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
owner.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err))
} else {
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team")
}
logger.Error(fmt.Sprintf("%s [母号] 浏览器授权失败: %v", logPrefix, err), owner.Email, "team")
continue
}
_, err = auth.SubmitS2AOAuth(
config.Global.S2AApiBase,
config.Global.S2AAdminKey,
s2aResp.Data.SessionID,
code,
owner.Email,
config.Global.Concurrency,
config.Global.Priority,
config.Global.GroupIDs,
config.Global.ProxyID,
)
if err != nil {
logger.Error(fmt.Sprintf("%s [母号] S2A提交失败: %v", logPrefix, err), owner.Email, "team")
continue
}
ownerSuccess = true
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team")
break
}
if !ownerSuccess {
result.Errors = append(result.Errors, "母号入库失败")
}
}