diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index ddc2242..52ac28c 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -3,6 +3,7 @@ package api import ( "encoding/json" "fmt" + "math/rand" "net/http" "strings" "sync" @@ -775,24 +776,31 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul } defer func() { <-s2aSem }() - // 从代理池获取随机代理(默认轮询使用代理池,无代理则直连) - proxyToUse, _ := database.Instance.GetRandomCodexProxy() - proxyDisplay := "直连" - if proxyToUse != "" { - proxyDisplay = getProxyDisplay(proxyToUse) - } - logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team") - var s2aSuccess bool var lastError string - for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 + for attempt := 0; attempt < 3; attempt++ { // 最多重试2次 // 检查停止信号 if isStopped() { return false } if attempt > 0 { - logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberEmail, "team") + // 重试前等待一段时间,避免密集请求 + retryDelay := time.Duration(3+attempt*2) * time.Second + logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team") + time.Sleep(retryDelay) + } + + // 每次尝试都从代理池获取新代理(避免复用失败代理) + proxyToUse, _ := database.Instance.GetRandomCodexProxy() + proxyDisplay := "直连" + if proxyToUse != "" { + proxyDisplay = getProxyDisplay(proxyToUse) + } + if attempt == 0 { + logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team") + } else { + logger.Status(fmt.Sprintf("%s 入库重试中... | 代理: %s", memberLogPrefix, proxyDisplay), memberEmail, "team") } // 创建日志回调 - 只显示关键步骤 @@ -948,13 +956,14 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul memberMu.Unlock() logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team") - // 流水线:注册成功后等待 3 秒再开始入库(避免触发邮箱验证) + // 流水线:注册成功后等待再开始入库(避免触发邮箱验证 + 成员间错开避免并发冲突) s2aWg.Add(1) go func(idx int, e, p string) { defer s2aWg.Done() - // 等待 3 秒,让账号状态稳定(可被停止信号中断) + // 基础 3 秒 + 成员索引 * 2 秒错开 + 0~2 秒随机抖动,避免同 Team 多成员同时选工作区 + stagger := 3*time.Second + time.Duration(idx*2)*time.Second + time.Duration(rand.Intn(2000))*time.Millisecond select { - case <-time.After(3 * time.Second): + case <-time.After(stagger): case <-teamProcessState.stopCh: return } diff --git a/backend/internal/auth/codex_api.go b/backend/internal/auth/codex_api.go index 88cfb3a..cbcbaea 100644 --- a/backend/internal/auth/codex_api.go +++ b/backend/internal/auth/codex_api.go @@ -519,12 +519,16 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) { "workspace_id": c.workspaceID, } - // 添加 500 错误重试机制 - 最多重试 3 次 + // 添加 500 错误重试机制 - 最多重试 5 次,指数退避 + 随机抖动 var lastErr error - for retry := 0; retry < 3; retry++ { + for retry := 0; retry < 5; retry++ { if retry > 0 { - c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区...", retry+1) - time.Sleep(time.Duration(2+retry) * time.Second) // 递增延迟: 2s, 3s, 4s + // 指数退避: 3s, 5s, 8s, 12s 基础延迟 + 0~3s 随机抖动 + baseDelay := time.Duration(3+retry*2) * time.Second + jitter := time.Duration(rand.Intn(3000)) * time.Millisecond + delay := baseDelay + jitter + c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区 (等待 %.1fs)...", retry+1, delay.Seconds()) + time.Sleep(delay) // 重新获取 Sentinel token if !c.callSentinelReq("password_verify__auto") { @@ -575,6 +579,13 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) { return "", fmt.Errorf("未能获取授权码") } + // 429 限流,可重试 + if resp.StatusCode == 429 { + c.logStep(StepSelectWorkspace, "请求限流 429,将重试...") + lastErr = fmt.Errorf("请求限流: 429") + continue + } + // 5xx 服务器错误,可重试 if resp.StatusCode >= 500 && resp.StatusCode < 600 { c.logStep(StepSelectWorkspace, "服务器错误 %d,将重试...", resp.StatusCode) diff --git a/backend/internal/auth/s2a.go b/backend/internal/auth/s2a.go index 069f0d8..62c9e8e 100644 --- a/backend/internal/auth/s2a.go +++ b/backend/internal/auth/s2a.go @@ -113,9 +113,9 @@ func GenerateS2AAuthURL(s2aAPIBase, s2aAdminKey string, proxyID *int) (*S2AAuthU return &result, nil } -// SubmitS2AOAuth 提交 OAuth code 到 S2A 入库 +// SubmitS2AOAuth 提交 OAuth code 到 S2A 入库(带重试) func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concurrency, priority int, groupIDs []int, proxyID *int) (*S2ACreateFromOAuthResponse, error) { - client := &http.Client{Timeout: 30 * time.Second} + httpClient := &http.Client{Timeout: 30 * time.Second} apiURL := s2aAPIBase + "/api/v1/admin/openai/create-from-oauth" @@ -130,29 +130,50 @@ func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concu } body, _ := json.Marshal(payload) - req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body)) - req.Header.Set("Accept", "application/json") - req.Header.Set("Content-Type", "application/json") - req.Header.Set("X-Api-Key", s2aAdminKey) + var lastErr error + for attempt := 0; attempt < 3; attempt++ { + if attempt > 0 { + time.Sleep(time.Duration(2+attempt*2) * time.Second) // 4s, 6s + } - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("请求失败: %v", err) - } - defer resp.Body.Close() + req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body)) + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-Api-Key", s2aAdminKey) - respBody, _ := io.ReadAll(resp.Body) + resp, err := httpClient.Do(req) + if err != nil { + lastErr = fmt.Errorf("请求失败: %v", err) + continue // 网络错误可重试 + } - var result S2ACreateFromOAuthResponse - if err := json.Unmarshal(respBody, &result); err != nil { - return nil, fmt.Errorf("解析响应失败: %v", err) + respBody, _ := io.ReadAll(resp.Body) + resp.Body.Close() + + // 5xx 服务端错误可重试 + if resp.StatusCode >= 500 { + lastErr = fmt.Errorf("S2A 服务端错误 HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))])) + continue + } + + // 非 200 的其他错误不重试 + if resp.StatusCode != 200 { + return nil, fmt.Errorf("S2A HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))])) + } + + var result S2ACreateFromOAuthResponse + if err := json.Unmarshal(respBody, &result); err != nil { + return nil, fmt.Errorf("解析响应失败 (HTTP %d): %v, body: %s", resp.StatusCode, err, string(respBody[:min(200, len(respBody))])) + } + + if result.Code != 0 { + return nil, fmt.Errorf("S2A 入库失败: %s (code=%d)", result.Message, result.Code) + } + + return &result, nil } - if result.Code != 0 { - return nil, fmt.Errorf("S2A 入库失败: %s", result.Message) - } - - return &result, nil + return nil, fmt.Errorf("S2A 入库失败 (重试耗尽): %v", lastErr) } // VerifyS2AAccount 验证账号入库状态