Files
codexautopool/backend/internal/auth/s2a.go
kyx236 847574e89e feat(auth): Optimize retry logic and add circuit breaker for team processing
- Reduce authorization retry attempts from 3 to 2 and optimize retry delays from exponential (3s, 5s, 8s, 12s) to fixed 3s backoff
- Implement team-level circuit breaker: skip member processing when 3+ consecutive 500 errors detected in same team
- Add consecutive 500 error tracking with atomic counter and reset on successful authorization
- Reduce ObtainAuthorizationCode timeout from 3 minutes to 90 seconds with updated error messages
- Optimize Codex API workspace selection retry: reduce max attempts from 5 to 3 with shorter backoff (2s, 4s instead of 3s, 5s, 8s, 12s)
- Reduce S2A OAuth submission retry delays from (4s, 6s) to (2s, 3s) for faster failure detection
- Optimize member stagger timing: reduce from 3s + idx*2s to 1s + idx*1s with reduced jitter (0-1s instead of 0-2s)
- Add early exit for exhausted retries in CompleteWithCodexAPI to prevent unnecessary outer retry attempts
- These changes improve responsiveness and reduce cascading failures during bulk team processing
2026-02-07 23:19:06 +08:00

320 lines
8.5 KiB
Go

package auth
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"codex-pool/internal/proxyutil"
)
const (
CodexClientID = "app_EMoamEEZ73f0CkXaXp7hrann"
CodexRedirectURI = "http://localhost:1455/auth/callback"
CodexScope = "openid profile email offline_access"
)
// CodexTokens Codex Token 结构
type CodexTokens struct {
AccessToken string `json:"access_token"`
RefreshToken string `json:"refresh_token"`
IDToken string `json:"id_token"`
TokenType string `json:"token_type"`
ExpiresIn int `json:"expires_in"`
ExpiredAt string `json:"expired_at,omitempty"`
}
// S2AAuthURLRequest S2A 授权 URL 请求
type S2AAuthURLRequest struct {
ProxyID *int `json:"proxy_id,omitempty"`
}
// S2AAuthURLResponse S2A 授权 URL 响应
type S2AAuthURLResponse struct {
Code int `json:"code"`
Data struct {
AuthURL string `json:"auth_url"`
SessionID string `json:"session_id"`
} `json:"data"`
Message string `json:"message,omitempty"`
}
// S2ACreateFromOAuthRequest 提交 OAuth 入库请求
type S2ACreateFromOAuthRequest struct {
SessionID string `json:"session_id"`
Code string `json:"code"`
Name string `json:"name,omitempty"`
Concurrency int `json:"concurrency,omitempty"`
Priority int `json:"priority,omitempty"`
GroupIDs []int `json:"group_ids,omitempty"`
ProxyID *int `json:"proxy_id,omitempty"`
}
// S2ACreateFromOAuthResponse 入库响应
type S2ACreateFromOAuthResponse struct {
Code int `json:"code"`
Data struct {
ID int `json:"id"`
Name string `json:"name"`
Platform string `json:"platform"`
Type string `json:"type"`
Status string `json:"status"`
Concurrency int `json:"concurrency"`
Priority int `json:"priority"`
} `json:"data"`
Message string `json:"message,omitempty"`
}
// GenerateS2AAuthURL 从 S2A 生成 Codex 授权 URL
func GenerateS2AAuthURL(s2aAPIBase, s2aAdminKey string, proxyID *int) (*S2AAuthURLResponse, error) {
client := &http.Client{Timeout: 15 * time.Second}
apiURL := s2aAPIBase + "/api/v1/admin/openai/generate-auth-url"
payload := S2AAuthURLRequest{ProxyID: proxyID}
body, _ := json.Marshal(payload)
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Api-Key", s2aAdminKey)
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("请求失败: %v", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
if len(respBody) > 0 && respBody[0] == '<' {
return nil, fmt.Errorf("服务器返回 HTML: %s", string(respBody)[:min(100, len(respBody))])
}
if resp.StatusCode != 200 {
return nil, fmt.Errorf("HTTP %d: %s", resp.StatusCode, string(respBody)[:min(200, len(respBody))])
}
var result S2AAuthURLResponse
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("解析响应失败: %v, body: %s", err, string(respBody)[:min(100, len(respBody))])
}
if result.Code != 0 {
return nil, fmt.Errorf("S2A 错误: %s", result.Message)
}
return &result, nil
}
// SubmitS2AOAuth 提交 OAuth code 到 S2A 入库(带重试)
func SubmitS2AOAuth(s2aAPIBase, s2aAdminKey, sessionID, code, name string, concurrency, priority int, groupIDs []int, proxyID *int) (*S2ACreateFromOAuthResponse, error) {
httpClient := &http.Client{Timeout: 30 * time.Second}
apiURL := s2aAPIBase + "/api/v1/admin/openai/create-from-oauth"
payload := S2ACreateFromOAuthRequest{
SessionID: sessionID,
Code: code,
Name: name,
Concurrency: concurrency,
Priority: priority,
GroupIDs: groupIDs,
ProxyID: proxyID,
}
body, _ := json.Marshal(payload)
var lastErr error
for attempt := 0; attempt < 3; attempt++ {
if attempt > 0 {
time.Sleep(time.Duration(1+attempt) * time.Second) // 2s, 3s
}
req, _ := http.NewRequest("POST", apiURL, bytes.NewReader(body))
req.Header.Set("Accept", "application/json")
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Api-Key", s2aAdminKey)
resp, err := httpClient.Do(req)
if err != nil {
lastErr = fmt.Errorf("请求失败: %v", err)
continue // 网络错误可重试
}
respBody, _ := io.ReadAll(resp.Body)
resp.Body.Close()
// 5xx 服务端错误可重试
if resp.StatusCode >= 500 {
lastErr = fmt.Errorf("S2A 服务端错误 HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))]))
continue
}
// 非 200 的其他错误不重试
if resp.StatusCode != 200 {
return nil, fmt.Errorf("S2A HTTP %d: %s", resp.StatusCode, string(respBody[:min(200, len(respBody))]))
}
var result S2ACreateFromOAuthResponse
if err := json.Unmarshal(respBody, &result); err != nil {
return nil, fmt.Errorf("解析响应失败 (HTTP %d): %v, body: %s", resp.StatusCode, err, string(respBody[:min(200, len(respBody))]))
}
if result.Code != 0 {
return nil, fmt.Errorf("S2A 入库失败: %s (code=%d)", result.Message, result.Code)
}
return &result, nil
}
return nil, fmt.Errorf("S2A 入库失败 (重试耗尽): %v", lastErr)
}
// VerifyS2AAccount 验证账号入库状态
func VerifyS2AAccount(s2aAPIBase, s2aAdminKey, email string) (bool, error) {
client := &http.Client{Timeout: 30 * time.Second}
apiURL := fmt.Sprintf("%s/api/v1/admin/accounts?page=1&page_size=20&search=%s&timezone=Asia/Shanghai", s2aAPIBase, url.QueryEscape(email))
req, _ := http.NewRequest("GET", apiURL, nil)
req.Header.Set("Accept", "application/json")
req.Header.Set("X-Api-Key", s2aAdminKey)
resp, err := client.Do(req)
if err != nil {
return false, fmt.Errorf("请求失败: %v", err)
}
defer resp.Body.Close()
respBody, _ := io.ReadAll(resp.Body)
var result struct {
Code int `json:"code"`
Data struct {
Items []struct {
ID int `json:"id"`
Name string `json:"name"`
Status string `json:"status"`
} `json:"items"`
Total int `json:"total"`
} `json:"data"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return false, fmt.Errorf("解析响应失败: %v", err)
}
if result.Code != 0 || result.Data.Total == 0 {
return false, nil
}
for _, item := range result.Data.Items {
if item.Status == "active" {
return true, nil
}
}
return false, nil
}
// ExtractCodeFromCallbackURL 从回调 URL 中提取 code
func ExtractCodeFromCallbackURL(callbackURL string) string {
parsedURL, err := url.Parse(callbackURL)
if err != nil {
return ""
}
return parsedURL.Query().Get("code")
}
// RefreshCodexToken 刷新 Codex token
func RefreshCodexToken(refreshToken string, proxyURL string) (*CodexTokens, error) {
client := &http.Client{Timeout: 30 * time.Second}
if proxyURL != "" {
info, err := proxyutil.Parse(proxyURL)
if err != nil {
return nil, fmt.Errorf("代理格式错误: %v", err)
}
if info.URL != nil {
client.Transport = &http.Transport{Proxy: http.ProxyURL(info.URL)}
}
}
data := url.Values{
"client_id": {CodexClientID},
"grant_type": {"refresh_token"},
"refresh_token": {refreshToken},
"scope": {"openid profile email"},
}
req, _ := http.NewRequest("POST", "https://auth.openai.com/oauth/token", strings.NewReader(data.Encode()))
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
resp, err := client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, _ := io.ReadAll(resp.Body)
if resp.StatusCode != 200 {
return nil, fmt.Errorf("刷新 token 失败: %d, %s", resp.StatusCode, string(body)[:min(200, len(body))])
}
var tokens CodexTokens
if err := json.Unmarshal(body, &tokens); err != nil {
return nil, err
}
if tokens.ExpiresIn > 0 {
expiredAt := time.Now().Add(time.Duration(tokens.ExpiresIn) * time.Second)
tokens.ExpiredAt = expiredAt.Format(time.RFC3339)
}
return &tokens, nil
}
// ExtractWorkspaceFromCookie 从 cookie 提取 workspace_id
func ExtractWorkspaceFromCookie(cookieValue string) string {
parts := strings.Split(cookieValue, ".")
if len(parts) < 1 {
return ""
}
payload := parts[0]
if m := len(payload) % 4; m != 0 {
payload += strings.Repeat("=", 4-m)
}
decoded, err := base64.StdEncoding.DecodeString(payload)
if err != nil {
decoded, err = base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return ""
}
}
var data struct {
Workspaces []struct {
ID string `json:"id"`
} `json:"workspaces"`
}
if err := json.Unmarshal(decoded, &data); err != nil {
return ""
}
if len(data.Workspaces) > 0 {
return data.Workspaces[0].ID
}
return ""
}