diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index 8a15780..0b22192 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -730,6 +730,9 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul var s2aSuccessCount int32 var s2aFailCount int32 + // Team 级别 500 错误熔断器:当同 Team 多个成员连续遇到 500 时快速失败 + var consecutive500Fails int32 + // 入库并发控制信号量 s2aSem := make(chan struct{}, req.ConcurrentS2A) @@ -779,14 +782,23 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul var s2aSuccess bool var lastError string - for attempt := 0; attempt < 3; attempt++ { // 最多重试2次 + for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 // 检查停止信号 if isStopped() { return false } + // 熔断检查:同 Team 已有 3+ 个成员连续 500 失败,快速跳过 + if atomic.LoadInt32(&consecutive500Fails) >= 3 { + logger.Warning(fmt.Sprintf("%s 同 Team 已有多个成员连续 500 失败,跳过入库", memberLogPrefix), memberEmail, "team") + atomic.AddInt32(&s2aFailCount, 1) + memberMu.Lock() + result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库跳过: 服务器持续 500 (熔断)", memberIdx+1)) + memberMu.Unlock() + return false + } if attempt > 0 { - // 重试前等待一段时间,避免密集请求(可被停止信号中断) - retryDelay := time.Duration(3+attempt*2) * time.Second + // 重试前短暂等待(可被停止信号中断) + retryDelay := time.Duration(3) * time.Second logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team") select { case <-time.After(retryDelay): @@ -847,8 +859,14 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul if err != nil { lastError = fmt.Sprintf("浏览器授权失败: %v", err) logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team") + // 跟踪 500 错误用于熔断 + if strings.Contains(err.Error(), "500") || strings.Contains(err.Error(), "重试已耗尽") { + atomic.AddInt32(&consecutive500Fails, 1) + } continue } + // 授权成功,重置 500 计数 + atomic.StoreInt32(&consecutive500Fails, 0) // 提交到 S2A _, err = auth.SubmitS2AOAuth( @@ -964,8 +982,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul s2aWg.Add(1) go func(idx int, e, p string) { defer s2aWg.Done() - // 基础 3 秒 + 成员索引 * 2 秒错开 + 0~2 秒随机抖动,避免同 Team 多成员同时选工作区 - stagger := 3*time.Second + time.Duration(idx*2)*time.Second + time.Duration(rand.Intn(2000))*time.Millisecond + // 基础 1 秒 + 成员索引 * 1 秒错开 + 0~1 秒随机抖动 + stagger := 1*time.Second + time.Duration(idx)*time.Second + time.Duration(rand.Intn(1000))*time.Millisecond select { case <-time.After(stagger): case <-teamProcessState.stopCh: diff --git a/backend/internal/auth/codex_api.go b/backend/internal/auth/codex_api.go index cbcbaea..c4b0178 100644 --- a/backend/internal/auth/codex_api.go +++ b/backend/internal/auth/codex_api.go @@ -296,7 +296,7 @@ func (c *CodexAPIAuth) GetSessionID() string { return c.sessionID } -// ObtainAuthorizationCode 获取授权码(全局 3 分钟超时) +// ObtainAuthorizationCode 获取授权码(全局 90 秒超时) func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) { type authResult struct { code string @@ -310,8 +310,8 @@ func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) { select { case r := <-resultCh: return r.code, r.err - case <-time.After(3 * time.Minute): - return "", fmt.Errorf("授权超时 (3分钟)") + case <-time.After(90 * time.Second): + return "", fmt.Errorf("授权超时 (90秒)") } } @@ -519,13 +519,13 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) { "workspace_id": c.workspaceID, } - // 添加 500 错误重试机制 - 最多重试 5 次,指数退避 + 随机抖动 + // 添加 500 错误重试机制 - 最多重试 3 次,短退避 + 随机抖动 var lastErr error - for retry := 0; retry < 5; retry++ { + for retry := 0; retry < 3; retry++ { if retry > 0 { - // 指数退避: 3s, 5s, 8s, 12s 基础延迟 + 0~3s 随机抖动 - baseDelay := time.Duration(3+retry*2) * time.Second - jitter := time.Duration(rand.Intn(3000)) * time.Millisecond + // 短退避: 2s, 4s 基础延迟 + 0~2s 随机抖动 + baseDelay := time.Duration(2+retry*2) * time.Second + jitter := time.Duration(rand.Intn(2000)) * time.Millisecond delay := baseDelay + jitter c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区 (等待 %.1fs)...", retry+1, delay.Seconds()) time.Sleep(delay) @@ -705,6 +705,10 @@ func CompleteWithCodexAPI(email, password, workspaceID, authURL, sessionID, prox code, err := auth.ObtainAuthorizationCode() if err != nil { auth.tlsClient.Close() + // 如果内层重试已耗尽(持续 500),直接返回不再外层重试 + if strings.Contains(err.Error(), "重试已耗尽") || strings.Contains(err.Error(), "授权超时") { + return "", err + } // 检查是否为 403 错误 if strings.Contains(err.Error(), "403") { lastErr = err diff --git a/backend/internal/auth/s2a.go b/backend/internal/auth/s2a.go index 62c9e8e..2eebed6 100644 --- a/backend/internal/auth/s2a.go +++ b/backend/internal/auth/s2a.go @@ -133,7 +133,7 @@ func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concu var lastErr error for attempt := 0; attempt < 3; attempt++ { if attempt > 0 { - time.Sleep(time.Duration(2+attempt*2) * time.Second) // 4s, 6s + time.Sleep(time.Duration(1+attempt) * time.Second) // 2s, 3s } req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))