feat(auth): Add retry logic with exponential backoff for S2A and Codex API
- Add exponential backoff with jitter to S2A OAuth submission (3 attempts) - Implement 5xx error retry mechanism in Codex API workspace selection (5 attempts) - Add 429 rate limit handling with retry support in Codex API - Improve team member processing with staggered delays to avoid concurrent conflicts - Add per-attempt proxy rotation to avoid reusing failed proxies - Enhance retry delay calculation with random jitter to prevent thundering herd - Update logging to display retry attempts and delay durations - Improve error messages with HTTP status codes and response body snippets - Refactor retry loops to use consistent exponential backoff pattern across modules
This commit is contained in:
@@ -3,6 +3,7 @@ package api
|
|||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"math/rand"
|
||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@@ -775,24 +776,31 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
}
|
}
|
||||||
defer func() { <-s2aSem }()
|
defer func() { <-s2aSem }()
|
||||||
|
|
||||||
// 从代理池获取随机代理(默认轮询使用代理池,无代理则直连)
|
|
||||||
proxyToUse, _ := database.Instance.GetRandomCodexProxy()
|
|
||||||
proxyDisplay := "直连"
|
|
||||||
if proxyToUse != "" {
|
|
||||||
proxyDisplay = getProxyDisplay(proxyToUse)
|
|
||||||
}
|
|
||||||
logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team")
|
|
||||||
|
|
||||||
var s2aSuccess bool
|
var s2aSuccess bool
|
||||||
var lastError string
|
var lastError string
|
||||||
|
|
||||||
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
|
for attempt := 0; attempt < 3; attempt++ { // 最多重试2次
|
||||||
// 检查停止信号
|
// 检查停止信号
|
||||||
if isStopped() {
|
if isStopped() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if attempt > 0 {
|
if attempt > 0 {
|
||||||
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberEmail, "team")
|
// 重试前等待一段时间,避免密集请求
|
||||||
|
retryDelay := time.Duration(3+attempt*2) * time.Second
|
||||||
|
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team")
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 每次尝试都从代理池获取新代理(避免复用失败代理)
|
||||||
|
proxyToUse, _ := database.Instance.GetRandomCodexProxy()
|
||||||
|
proxyDisplay := "直连"
|
||||||
|
if proxyToUse != "" {
|
||||||
|
proxyDisplay = getProxyDisplay(proxyToUse)
|
||||||
|
}
|
||||||
|
if attempt == 0 {
|
||||||
|
logger.Status(fmt.Sprintf("%s 入库中... | 邮箱: %s | 代理: %s", memberLogPrefix, memberEmail, proxyDisplay), memberEmail, "team")
|
||||||
|
} else {
|
||||||
|
logger.Status(fmt.Sprintf("%s 入库重试中... | 代理: %s", memberLogPrefix, proxyDisplay), memberEmail, "team")
|
||||||
}
|
}
|
||||||
|
|
||||||
// 创建日志回调 - 只显示关键步骤
|
// 创建日志回调 - 只显示关键步骤
|
||||||
@@ -948,13 +956,14 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
memberMu.Unlock()
|
memberMu.Unlock()
|
||||||
logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team")
|
logger.Success(fmt.Sprintf("%s ✓ 注册成功 (耗时: %.1fs)", memberLogPrefix, regDuration.Seconds()), currentEmail, "team")
|
||||||
|
|
||||||
// 流水线:注册成功后等待 3 秒再开始入库(避免触发邮箱验证)
|
// 流水线:注册成功后等待再开始入库(避免触发邮箱验证 + 成员间错开避免并发冲突)
|
||||||
s2aWg.Add(1)
|
s2aWg.Add(1)
|
||||||
go func(idx int, e, p string) {
|
go func(idx int, e, p string) {
|
||||||
defer s2aWg.Done()
|
defer s2aWg.Done()
|
||||||
// 等待 3 秒,让账号状态稳定(可被停止信号中断)
|
// 基础 3 秒 + 成员索引 * 2 秒错开 + 0~2 秒随机抖动,避免同 Team 多成员同时选工作区
|
||||||
|
stagger := 3*time.Second + time.Duration(idx*2)*time.Second + time.Duration(rand.Intn(2000))*time.Millisecond
|
||||||
select {
|
select {
|
||||||
case <-time.After(3 * time.Second):
|
case <-time.After(stagger):
|
||||||
case <-teamProcessState.stopCh:
|
case <-teamProcessState.stopCh:
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -519,12 +519,16 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) {
|
|||||||
"workspace_id": c.workspaceID,
|
"workspace_id": c.workspaceID,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加 500 错误重试机制 - 最多重试 3 次
|
// 添加 500 错误重试机制 - 最多重试 5 次,指数退避 + 随机抖动
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for retry := 0; retry < 3; retry++ {
|
for retry := 0; retry < 5; retry++ {
|
||||||
if retry > 0 {
|
if retry > 0 {
|
||||||
c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区...", retry+1)
|
// 指数退避: 3s, 5s, 8s, 12s 基础延迟 + 0~3s 随机抖动
|
||||||
time.Sleep(time.Duration(2+retry) * time.Second) // 递增延迟: 2s, 3s, 4s
|
baseDelay := time.Duration(3+retry*2) * time.Second
|
||||||
|
jitter := time.Duration(rand.Intn(3000)) * time.Millisecond
|
||||||
|
delay := baseDelay + jitter
|
||||||
|
c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区 (等待 %.1fs)...", retry+1, delay.Seconds())
|
||||||
|
time.Sleep(delay)
|
||||||
|
|
||||||
// 重新获取 Sentinel token
|
// 重新获取 Sentinel token
|
||||||
if !c.callSentinelReq("password_verify__auto") {
|
if !c.callSentinelReq("password_verify__auto") {
|
||||||
@@ -575,6 +579,13 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) {
|
|||||||
return "", fmt.Errorf("未能获取授权码")
|
return "", fmt.Errorf("未能获取授权码")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 429 限流,可重试
|
||||||
|
if resp.StatusCode == 429 {
|
||||||
|
c.logStep(StepSelectWorkspace, "请求限流 429,将重试...")
|
||||||
|
lastErr = fmt.Errorf("请求限流: 429")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
// 5xx 服务器错误,可重试
|
// 5xx 服务器错误,可重试
|
||||||
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
|
if resp.StatusCode >= 500 && resp.StatusCode < 600 {
|
||||||
c.logStep(StepSelectWorkspace, "服务器错误 %d,将重试...", resp.StatusCode)
|
c.logStep(StepSelectWorkspace, "服务器错误 %d,将重试...", resp.StatusCode)
|
||||||
|
|||||||
@@ -113,9 +113,9 @@ func GenerateS2AAuthURL(s2aAPIBase, s2aAdminKey string, proxyID *int) (*S2AAuthU
|
|||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SubmitS2AOAuth 提交 OAuth code 到 S2A 入库
|
// SubmitS2AOAuth 提交 OAuth code 到 S2A 入库(带重试)
|
||||||
func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concurrency, priority int, groupIDs []int, proxyID *int) (*S2ACreateFromOAuthResponse, error) {
|
func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concurrency, priority int, groupIDs []int, proxyID *int) (*S2ACreateFromOAuthResponse, error) {
|
||||||
client := &http.Client{Timeout: 30 * time.Second}
|
httpClient := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
|
||||||
apiURL := s2aAPIBase + "/api/v1/admin/openai/create-from-oauth"
|
apiURL := s2aAPIBase + "/api/v1/admin/openai/create-from-oauth"
|
||||||
|
|
||||||
@@ -130,31 +130,52 @@ func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concu
|
|||||||
}
|
}
|
||||||
body, _ := json.Marshal(payload)
|
body, _ := json.Marshal(payload)
|
||||||
|
|
||||||
|
var lastErr error
|
||||||
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
|
if attempt > 0 {
|
||||||
|
time.Sleep(time.Duration(2+attempt*2) * time.Second) // 4s, 6s
|
||||||
|
}
|
||||||
|
|
||||||
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
|
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
|
||||||
req.Header.Set("Accept", "application/json")
|
req.Header.Set("Accept", "application/json")
|
||||||
req.Header.Set("Content-Type", "application/json")
|
req.Header.Set("Content-Type", "application/json")
|
||||||
req.Header.Set("X-Api-Key", s2aAdminKey)
|
req.Header.Set("X-Api-Key", s2aAdminKey)
|
||||||
|
|
||||||
resp, err := client.Do(req)
|
resp, err := httpClient.Do(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("请求失败: %v", err)
|
lastErr = fmt.Errorf("请求失败: %v", err)
|
||||||
|
continue // 网络错误可重试
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
respBody, _ := io.ReadAll(resp.Body)
|
respBody, _ := io.ReadAll(resp.Body)
|
||||||
|
resp.Body.Close()
|
||||||
|
|
||||||
|
// 5xx 服务端错误可重试
|
||||||
|
if resp.StatusCode >= 500 {
|
||||||
|
lastErr = fmt.Errorf("S2A 服务端错误 HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))]))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// 非 200 的其他错误不重试
|
||||||
|
if resp.StatusCode != 200 {
|
||||||
|
return nil, fmt.Errorf("S2A HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))]))
|
||||||
|
}
|
||||||
|
|
||||||
var result S2ACreateFromOAuthResponse
|
var result S2ACreateFromOAuthResponse
|
||||||
if err := json.Unmarshal(respBody, &result); err != nil {
|
if err := json.Unmarshal(respBody, &result); err != nil {
|
||||||
return nil, fmt.Errorf("解析响应失败: %v", err)
|
return nil, fmt.Errorf("解析响应失败 (HTTP %d): %v, body: %s", resp.StatusCode, err, string(respBody[:min(200, len(respBody))]))
|
||||||
}
|
}
|
||||||
|
|
||||||
if result.Code != 0 {
|
if result.Code != 0 {
|
||||||
return nil, fmt.Errorf("S2A 入库失败: %s", result.Message)
|
return nil, fmt.Errorf("S2A 入库失败: %s (code=%d)", result.Message, result.Code)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &result, nil
|
return &result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("S2A 入库失败 (重试耗尽): %v", lastErr)
|
||||||
|
}
|
||||||
|
|
||||||
// VerifyS2AAccount 验证账号入库状态
|
// VerifyS2AAccount 验证账号入库状态
|
||||||
func VerifyS2AAccount(s2aAPIBase, s2aAdminKey, email string) (bool, error) {
|
func VerifyS2AAccount(s2aAPIBase, s2aAdminKey, email string) (bool, error) {
|
||||||
client := &http.Client{Timeout: 30 * time.Second}
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
|||||||
Reference in New Issue
Block a user