diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 7e612e6..fdff0af 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -114,6 +114,9 @@ func startServer(cfg *config.Config) { mux.HandleFunc("/api/s2a/clean-errors", api.CORS(api.HandleCleanErrorAccounts)) // 清理错误账号 mux.HandleFunc("/api/s2a/cleaner/settings", api.CORS(handleCleanerSettings)) // 清理服务设置 + // 统计 API + mux.HandleFunc("/api/stats/max-rpm", api.CORS(api.HandleGetMaxRPM)) // 今日最高 RPM + // 邮箱服务 API mux.HandleFunc("/api/mail/services", api.CORS(handleMailServices)) mux.HandleFunc("/api/mail/services/test", api.CORS(handleTestMailService)) @@ -495,15 +498,75 @@ func handleLogStream(w http.ResponseWriter, r *http.Request) { func handleS2ATest(w http.ResponseWriter, r *http.Request) { if config.Global == nil || config.Global.S2AApiBase == "" { - api.Error(w, http.StatusBadRequest, "S2A 配置未设置") + api.Error(w, http.StatusBadRequest, "S2A API 地址未设置") return } - // 简单测试连接 - api.Success(w, map[string]interface{}{ - "connected": true, - "message": "S2A 配置已就绪", - }) + if config.Global.S2AAdminKey == "" { + api.Error(w, http.StatusBadRequest, "S2A Admin Key 未设置") + return + } + + // 请求 S2A 仪表盘接口 + dashboardURL := config.Global.S2AApiBase + "/api/v1/admin/dashboard" + req, err := http.NewRequest("GET", dashboardURL, nil) + if err != nil { + api.Error(w, http.StatusInternalServerError, "创建请求失败") + return + } + + // 设置认证头 + adminKey := config.Global.S2AAdminKey + req.Header.Set("Authorization", "Bearer "+adminKey) + req.Header.Set("X-API-Key", adminKey) + req.Header.Set("X-Admin-Key", adminKey) + req.Header.Set("Accept", "application/json") + + // 发送请求 + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + logger.Error(fmt.Sprintf("S2A 连接测试失败: %v", err), "", "s2a") + api.Error(w, http.StatusBadGateway, fmt.Sprintf("无法连接 S2A: %v", err)) + return + } + defer resp.Body.Close() + + // 读取响应 + bodyBytes, _ := io.ReadAll(resp.Body) + + // 检查响应状态 + if resp.StatusCode == 200 { + // 解析仪表盘数据 + var dashboardData map[string]interface{} + if err := json.Unmarshal(bodyBytes, &dashboardData); err == nil { + logger.Success("S2A 连接测试成功", "", "s2a") + api.Success(w, map[string]interface{}{ + "connected": true, + "message": "S2A 连接成功", + "dashboard": dashboardData, + }) + return + } + // 解析失败但状态码 200,仍然认为连接成功 + logger.Success("S2A 连接测试成功", "", "s2a") + api.Success(w, map[string]interface{}{ + "connected": true, + "message": "S2A 连接成功", + }) + return + } + + // 认证失败 + if resp.StatusCode == 401 || resp.StatusCode == 403 { + logger.Error("S2A 认证失败: Admin Key 无效", "", "s2a") + api.Error(w, http.StatusUnauthorized, "S2A Admin Key 无效") + return + } + + // 其他错误 + logger.Error(fmt.Sprintf("S2A 响应错误: %d", resp.StatusCode), "", "s2a") + api.Error(w, http.StatusBadGateway, fmt.Sprintf("S2A 响应错误: %d", resp.StatusCode)) } // handleS2AProxy 代理 S2A API 请求 @@ -569,6 +632,16 @@ func handleS2AProxy(w http.ResponseWriter, r *http.Request) { // 记录响应状态(仅显示状态码和长度) logger.Info(fmt.Sprintf("S2A 响应: status=%d, len=%d", resp.StatusCode, len(bodyBytes)), "", "proxy") + // 如果是 dashboard 请求且成功,提取 RPM 并更新最高值 + if resp.StatusCode == 200 && strings.Contains(targetPath, "dashboard") { + rpm := api.ExtractRPMFromDashboard(bodyBytes) + if rpm > 0 { + if api.UpdateMaxRPM(rpm) { + logger.Info(fmt.Sprintf("更新今日最高 RPM: %d", rpm), "", "stats") + } + } + } + // 复制响应头 for key, values := range resp.Header { for _, value := range values { diff --git a/backend/internal/api/max_rpm.go b/backend/internal/api/max_rpm.go new file mode 100644 index 0000000..644a0c0 --- /dev/null +++ b/backend/internal/api/max_rpm.go @@ -0,0 +1,112 @@ +package api + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "codex-pool/internal/database" +) + +// MaxRPMData 最高 RPM 数据 +type MaxRPMData struct { + MaxRPM int `json:"max_rpm"` + Date string `json:"date"` + UpdatedAt string `json:"updated_at,omitempty"` +} + +// GetMaxRPMToday 获取今日最高 RPM +func GetMaxRPMToday() MaxRPMData { + if database.Instance == nil { + return MaxRPMData{MaxRPM: 0, Date: time.Now().Format("2006-01-02")} + } + + today := time.Now().Format("2006-01-02") + + // 获取存储的日期 + storedDate, _ := database.Instance.GetConfig("max_rpm_date") + + // 如果日期不是今天,重置 + if storedDate != today { + return MaxRPMData{MaxRPM: 0, Date: today} + } + + // 获取今日最高 RPM + maxRPMStr, _ := database.Instance.GetConfig("max_rpm_today") + maxRPM, _ := strconv.Atoi(maxRPMStr) + + updatedAt, _ := database.Instance.GetConfig("max_rpm_updated_at") + + return MaxRPMData{ + MaxRPM: maxRPM, + Date: today, + UpdatedAt: updatedAt, + } +} + +// UpdateMaxRPM 更新今日最高 RPM(如果当前值更高) +func UpdateMaxRPM(currentRPM int) bool { + if database.Instance == nil || currentRPM <= 0 { + return false + } + + today := time.Now().Format("2006-01-02") + + // 获取存储的日期 + storedDate, _ := database.Instance.GetConfig("max_rpm_date") + + var currentMax int + if storedDate == today { + // 同一天,获取当前最高值 + maxRPMStr, _ := database.Instance.GetConfig("max_rpm_today") + currentMax, _ = strconv.Atoi(maxRPMStr) + } else { + // 新的一天,重置 + currentMax = 0 + database.Instance.SetConfig("max_rpm_date", today) + } + + // 如果当前 RPM 更高,更新记录 + if currentRPM > currentMax { + database.Instance.SetConfig("max_rpm_today", strconv.Itoa(currentRPM)) + database.Instance.SetConfig("max_rpm_updated_at", time.Now().Format("15:04:05")) + return true + } + + return false +} + +// HandleGetMaxRPM GET /api/stats/max-rpm - 获取今日最高 RPM +func HandleGetMaxRPM(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodGet { + Error(w, http.StatusMethodNotAllowed, "仅支持 GET") + return + } + + data := GetMaxRPMToday() + Success(w, data) +} + +// ExtractRPMFromDashboard 从仪表盘响应中提取 RPM 值 +func ExtractRPMFromDashboard(body []byte) int { + var data map[string]interface{} + if err := json.Unmarshal(body, &data); err != nil { + return 0 + } + + // 尝试从不同的响应结构中提取 RPM + // 格式1: { "rpm": 123 } + if rpm, ok := data["rpm"].(float64); ok { + return int(rpm) + } + + // 格式2: { "data": { "rpm": 123 } } + if dataObj, ok := data["data"].(map[string]interface{}); ok { + if rpm, ok := dataObj["rpm"].(float64); ok { + return int(rpm) + } + } + + return 0 +} diff --git a/backend/internal/auth/codex_api.go b/backend/internal/auth/codex_api.go index 9b47ac0..0b93333 100644 --- a/backend/internal/auth/codex_api.go +++ b/backend/internal/auth/codex_api.go @@ -601,44 +601,77 @@ func (c *CodexAPIAuth) ObtainAuthorizationCode() (string, error) { "workspace_id": c.workspaceID, } - resp, body, err = c.doRequest("POST", "https://auth.openai.com/api/accounts/workspace/select", workspacePayload, workspaceHeaders) - if err != nil || resp.StatusCode != 200 { + // 添加 500 错误重试机制 - 最多重试 3 次 + var lastErr error + for retry := 0; retry < 3; retry++ { + if retry > 0 { + c.logStep(StepSelectWorkspace, "第 %d 次重试选择工作区...", retry+1) + time.Sleep(time.Duration(2+retry) * time.Second) // 递增延迟: 2s, 3s, 4s + + // 重新获取 Sentinel token + if !c.callSentinelReq("password_verify__auto") { + c.callSentinelReq("email_otp_validate__auto") + } + workspaceHeaders["OpenAI-Sentinel-Token"] = c.getSentinelHeader("workspace_select") + } + + resp, body, err = c.doRequest("POST", "https://auth.openai.com/api/accounts/workspace/select", workspacePayload, workspaceHeaders) + if err != nil { + lastErr = fmt.Errorf("请求失败: %v", err) + continue + } + + // 成功 + if resp.StatusCode == 200 { + json.Unmarshal(body, &data) + continueURL, ok := data["continue_url"].(string) + if !ok || continueURL == "" { + c.logError(StepSelectWorkspace, "未获取到 continue_url, 响应: %s", string(body[:min(500, len(body))])) + return "", fmt.Errorf("未获取到 continue_url") + } + + // 7. 跟随重定向获取授权码 + c.logStep(StepWaitCallback, "跟随重定向...") + for i := 0; i < 10; i++ { + resp, _, err = c.doRequest("GET", continueURL, nil, headers) + if err != nil { + break + } + + if resp.StatusCode >= 300 && resp.StatusCode < 400 { + location := resp.Header.Get("Location") + if strings.Contains(location, "localhost:1455") { + code := ExtractCodeFromCallbackURL(location) + if code != "" { + c.logStep(StepComplete, "授权成功,获取到授权码") + return code, nil + } + } + continueURL = location + } else { + break + } + } + + c.logError(StepWaitCallback, "未能获取授权码") + return "", fmt.Errorf("未能获取授权码") + } + + // 5xx 服务器错误,可重试 + if resp.StatusCode >= 500 && resp.StatusCode < 600 { + c.logStep(StepSelectWorkspace, "服务器错误 %d,将重试...", resp.StatusCode) + lastErr = fmt.Errorf("服务器错误: %d", resp.StatusCode) + continue + } + + // 其他错误,不重试 c.logError(StepSelectWorkspace, "选择工作区失败: %d - %s", resp.StatusCode, string(body[:min(200, len(body))])) return "", fmt.Errorf("选择工作区失败: %d", resp.StatusCode) } - json.Unmarshal(body, &data) - continueURL, ok := data["continue_url"].(string) - if !ok || continueURL == "" { - c.logError(StepSelectWorkspace, "未获取到 continue_url, 响应: %s", string(body[:min(500, len(body))])) - return "", fmt.Errorf("未获取到 continue_url") - } - - // 7. 跟随重定向获取授权码 - c.logStep(StepWaitCallback, "跟随重定向...") - for i := 0; i < 10; i++ { - resp, _, err = c.doRequest("GET", continueURL, nil, headers) - if err != nil { - break - } - - if resp.StatusCode >= 300 && resp.StatusCode < 400 { - location := resp.Header.Get("Location") - if strings.Contains(location, "localhost:1455") { - code := ExtractCodeFromCallbackURL(location) - if code != "" { - c.logStep(StepComplete, "授权成功,获取到授权码") - return code, nil - } - } - continueURL = location - } else { - break - } - } - - c.logError(StepWaitCallback, "未能获取授权码") - return "", fmt.Errorf("未能获取授权码") + // 重试耗尽 + c.logError(StepSelectWorkspace, "选择工作区失败,重试已耗尽: %v", lastErr) + return "", fmt.Errorf("选择工作区失败 (重试已耗尽): %v", lastErr) } // ExchangeCodeForTokens 用授权码换取 tokens diff --git a/frontend/src/components/dashboard/PoolStatus.tsx b/frontend/src/components/dashboard/PoolStatus.tsx index 3e90db0..a3d3b66 100644 --- a/frontend/src/components/dashboard/PoolStatus.tsx +++ b/frontend/src/components/dashboard/PoolStatus.tsx @@ -1,4 +1,5 @@ -import { Users, CheckCircle, XCircle, AlertTriangle, Zap, Activity } from 'lucide-react' +import { useState, useEffect } from 'react' +import { Users, CheckCircle, XCircle, AlertTriangle, Zap, Activity, TrendingUp } from 'lucide-react' import type { DashboardStats } from '../../types' import StatsCard from './StatsCard' @@ -9,6 +10,25 @@ interface PoolStatusProps { } export default function PoolStatus({ stats, loading, error }: PoolStatusProps) { + const [maxRpm, setMaxRpm] = useState(0) + + // 获取今日最高 RPM + useEffect(() => { + const fetchMaxRpm = async () => { + try { + const res = await fetch('/api/stats/max-rpm') + const data = await res.json() + if (data.code === 0 && data.data) { + setMaxRpm(data.data.max_rpm || 0) + } + } catch (error) { + console.error('Failed to fetch max RPM:', error) + } + } + fetchMaxRpm() + // 每次 stats 更新时也刷新 max RPM + }, [stats]) + if (error) { return (
@@ -58,7 +78,17 @@ export default function PoolStatus({ stats, loading, error }: PoolStatusProps) { color="slate" loading={loading} /> - + {/* RPM 卡片 - 显示当前 RPM 和今日最高 */} +
+ + {/* 今日最高 RPM 显示在卡片下方 */} + {maxRpm > 0 && ( +
+ + 今日最高: {maxRpm} +
+ )} +
) }