feat(auth): Optimize retry logic and add circuit breaker for team processing
- Reduce authorization retry attempts from 3 to 2 and optimize retry delays from exponential (3s, 5s, 8s, 12s) to fixed 3s backoff - Implement team-level circuit breaker: skip member processing when 3+ consecutive 500 errors detected in same team - Add consecutive 500 error tracking with atomic counter and reset on successful authorization - Reduce ObtainAuthorizationCode timeout from 3 minutes to 90 seconds with updated error messages - Optimize Codex API workspace selection retry: reduce max attempts from 5 to 3 with shorter backoff (2s, 4s instead of 3s, 5s, 8s, 12s) - Reduce S2A OAuth submission retry delays from (4s, 6s) to (2s, 3s) for faster failure detection - Optimize member stagger timing: reduce from 3s + idx*2s to 1s + idx*1s with reduced jitter (0-1s instead of 0-2s) - Add early exit for exhausted retries in CompleteWithCodexAPI to prevent unnecessary outer retry attempts - These changes improve responsiveness and reduce cascading failures during bulk team processing
This commit is contained in:
@@ -730,6 +730,9 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
var s2aSuccessCount int32
|
var s2aSuccessCount int32
|
||||||
var s2aFailCount int32
|
var s2aFailCount int32
|
||||||
|
|
||||||
|
// Team 级别 500 错误熔断器:当同 Team 多个成员连续遇到 500 时快速失败
|
||||||
|
var consecutive500Fails int32
|
||||||
|
|
||||||
// 入库并发控制信号量
|
// 入库并发控制信号量
|
||||||
s2aSem := make(chan struct{}, req.ConcurrentS2A)
|
s2aSem := make(chan struct{}, req.ConcurrentS2A)
|
||||||
|
|
||||||
@@ -779,14 +782,23 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
var s2aSuccess bool
|
var s2aSuccess bool
|
||||||
var lastError string
|
var lastError string
|
||||||
|
|
||||||
for attempt := 0; attempt < 3; attempt++ { // 最多重试2次
|
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
|
||||||
// 检查停止信号
|
// 检查停止信号
|
||||||
if isStopped() {
|
if isStopped() {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
// 熔断检查:同 Team 已有 3+ 个成员连续 500 失败,快速跳过
|
||||||
|
if atomic.LoadInt32(&consecutive500Fails) >= 3 {
|
||||||
|
logger.Warning(fmt.Sprintf("%s 同 Team 已有多个成员连续 500 失败,跳过入库", memberLogPrefix), memberEmail, "team")
|
||||||
|
atomic.AddInt32(&s2aFailCount, 1)
|
||||||
|
memberMu.Lock()
|
||||||
|
result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库跳过: 服务器持续 500 (熔断)", memberIdx+1))
|
||||||
|
memberMu.Unlock()
|
||||||
|
return false
|
||||||
|
}
|
||||||
if attempt > 0 {
|
if attempt > 0 {
|
||||||
// 重试前等待一段时间,避免密集请求(可被停止信号中断)
|
// 重试前短暂等待(可被停止信号中断)
|
||||||
retryDelay := time.Duration(3+attempt*2) * time.Second
|
retryDelay := time.Duration(3) * time.Second
|
||||||
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team")
|
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次, 等待 %ds)", memberLogPrefix, attempt+1, int(retryDelay.Seconds())), memberEmail, "team")
|
||||||
select {
|
select {
|
||||||
case <-time.After(retryDelay):
|
case <-time.After(retryDelay):
|
||||||
@@ -847,8 +859,14 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
|
lastError = fmt.Sprintf("浏览器授权失败: %v", err)
|
||||||
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team")
|
logger.Error(fmt.Sprintf("%s %s", memberLogPrefix, lastError), memberEmail, "team")
|
||||||
|
// 跟踪 500 错误用于熔断
|
||||||
|
if strings.Contains(err.Error(), "500") || strings.Contains(err.Error(), "重试已耗尽") {
|
||||||
|
atomic.AddInt32(&consecutive500Fails, 1)
|
||||||
|
}
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// 授权成功,重置 500 计数
|
||||||
|
atomic.StoreInt32(&consecutive500Fails, 0)
|
||||||
|
|
||||||
// 提交到 S2A
|
// 提交到 S2A
|
||||||
_, err = auth.SubmitS2AOAuth(
|
_, err = auth.SubmitS2AOAuth(
|
||||||
@@ -964,8 +982,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
|||||||
s2aWg.Add(1)
|
s2aWg.Add(1)
|
||||||
go func(idx int, e, p string) {
|
go func(idx int, e, p string) {
|
||||||
defer s2aWg.Done()
|
defer s2aWg.Done()
|
||||||
// 基础 3 秒 + 成员索引 * 2 秒错开 + 0~2 秒随机抖动,避免同 Team 多成员同时选工作区
|
// 基础 1 秒 + 成员索引 * 1 秒错开 + 0~1 秒随机抖动
|
||||||
stagger := 3*time.Second + time.Duration(idx*2)*time.Second + time.Duration(rand.Intn(2000))*time.Millisecond
|
stagger := 1*time.Second + time.Duration(idx)*time.Second + time.Duration(rand.Intn(1000))*time.Millisecond
|
||||||
select {
|
select {
|
||||||
case <-time.After(stagger):
|
case <-time.After(stagger):
|
||||||
case <-teamProcessState.stopCh:
|
case <-teamProcessState.stopCh:
|
||||||
|
|||||||
@@ -296,7 +296,7 @@ func (c *CodexAPIAuth) GetSessionID() string {
|
|||||||
return c.sessionID
|
return c.sessionID
|
||||||
}
|
}
|
||||||
|
|
||||||
// ObtainAuthorizationCode 获取授权码(全局 3 分钟超时)
|
// ObtainAuthorizationCode 获取授权码(全局 90 秒超时)
|
||||||
func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) {
|
func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) {
|
||||||
type authResult struct {
|
type authResult struct {
|
||||||
code string
|
code string
|
||||||
@@ -310,8 +310,8 @@ func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) {
|
|||||||
select {
|
select {
|
||||||
case r := <-resultCh:
|
case r := <-resultCh:
|
||||||
return r.code, r.err
|
return r.code, r.err
|
||||||
case <-time.After(3 * time.Minute):
|
case <-time.After(90 * time.Second):
|
||||||
return "", fmt.Errorf("授权超时 (3分钟)")
|
return "", fmt.Errorf("授权超时 (90秒)")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -519,13 +519,13 @@ func (c *CodexAPIAuth) obtainAuthorizationCodeInternal() (string, error) {
|
|||||||
"workspace_id": c.workspaceID,
|
"workspace_id": c.workspaceID,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 添加 500 错误重试机制 - 最多重试 5 次,指数退避 + 随机抖动
|
// 添加 500 错误重试机制 - 最多重试 3 次,短退避 + 随机抖动
|
||||||
var lastErr error
|
var lastErr error
|
||||||
for retry := 0; retry < 5; retry++ {
|
for retry := 0; retry < 3; retry++ {
|
||||||
if retry > 0 {
|
if retry > 0 {
|
||||||
// 指数退避: 3s, 5s, 8s, 12s 基础延迟 + 0~3s 随机抖动
|
// 短退避: 2s, 4s 基础延迟 + 0~2s 随机抖动
|
||||||
baseDelay := time.Duration(3+retry*2) * time.Second
|
baseDelay := time.Duration(2+retry*2) * time.Second
|
||||||
jitter := time.Duration(rand.Intn(3000)) * time.Millisecond
|
jitter := time.Duration(rand.Intn(2000)) * time.Millisecond
|
||||||
delay := baseDelay + jitter
|
delay := baseDelay + jitter
|
||||||
c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区 (等待 %.1fs)...", retry+1, delay.Seconds())
|
c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区 (等待 %.1fs)...", retry+1, delay.Seconds())
|
||||||
time.Sleep(delay)
|
time.Sleep(delay)
|
||||||
@@ -705,6 +705,10 @@ func CompleteWithCodexAPI(email, password, workspaceID, authURL, sessionID, prox
|
|||||||
code, err := auth.ObtainAuthorizationCode()
|
code, err := auth.ObtainAuthorizationCode()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
auth.tlsClient.Close()
|
auth.tlsClient.Close()
|
||||||
|
// 如果内层重试已耗尽(持续 500),直接返回不再外层重试
|
||||||
|
if strings.Contains(err.Error(), "重试已耗尽") || strings.Contains(err.Error(), "授权超时") {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
// 检查是否为 403 错误
|
// 检查是否为 403 错误
|
||||||
if strings.Contains(err.Error(), "403") {
|
if strings.Contains(err.Error(), "403") {
|
||||||
lastErr = err
|
lastErr = err
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concu
|
|||||||
var lastErr error
|
var lastErr error
|
||||||
for attempt := 0; attempt < 3; attempt++ {
|
for attempt := 0; attempt < 3; attempt++ {
|
||||||
if attempt > 0 {
|
if attempt > 0 {
|
||||||
time.Sleep(time.Duration(2+attempt*2) * time.Second) // 4s, 6s
|
time.Sleep(time.Duration(1+attempt) * time.Second) // 2s, 3s
|
||||||
}
|
}
|
||||||
|
|
||||||
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
|
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
|
||||||
|
|||||||
Reference in New Issue
Block a user