package main import ( "bytes" "encoding/json" "fmt" "io" "net" "net/http" "os" "path/filepath" "strings" "time" "codex-pool/internal/api" "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/logger" "codex-pool/internal/mail" "codex-pool/internal/register" "codex-pool/internal/web" ) func main() { // ANSI 颜色代码 colorReset := "\033[0m" colorCyan := "\033[36m" colorGreen := "\033[32m" colorYellow := "\033[33m" colorGray := "\033[90m" colorBold := "\033[1m" fmt.Printf("%s%s============================================================%s\n", colorBold, colorCyan, colorReset) fmt.Printf("%s%s Codex Pool - HTTP API Server%s\n", colorBold, colorCyan, colorReset) fmt.Printf("%s%s============================================================%s\n\n", colorBold, colorCyan, colorReset) // 确定数据目录 dataDir := "data" // 确保数据目录存在 if err := os.MkdirAll(dataDir, 0755); err != nil { fmt.Printf("%s[WARN]%s 创建数据目录失败: %v, 使用当前目录\n", colorYellow, colorReset, err) dataDir = "." } // 初始化数据库 (先于配置) dbPath := filepath.Join(dataDir, "codex-pool.db") if err := database.Init(dbPath); err != nil { fmt.Printf("%s[ERROR]%s 数据库初始化失败: %v\n", "\033[31m", colorReset, err) os.Exit(1) } // 设置配置数据库并加载配置 config.SetConfigDB(database.Instance) cfg := config.InitFromDB() // 初始化邮箱服务 if len(cfg.MailServices) > 0 { mail.Init(cfg.MailServices) fmt.Printf("%s[邮箱]%s 已加载 %d 个邮箱服务\n", colorGreen, colorReset, len(cfg.MailServices)) } fmt.Printf("%s[配置]%s 数据库: %s\n", colorGray, colorReset, dbPath) fmt.Printf("%s[配置]%s 端口: %d\n", colorGray, colorReset, cfg.Port) if cfg.S2AApiBase != "" { fmt.Printf("%s[配置]%s S2A API: %s\n", colorGray, colorReset, cfg.S2AApiBase) } else { fmt.Printf("%s[配置]%s S2A API: %s未配置%s (请在Web界面配置)\n", colorGray, colorReset, colorYellow, colorReset) } if cfg.ProxyEnabled { fmt.Printf("%s[配置]%s 代理: %s (已启用)\n", colorGray, colorReset, cfg.DefaultProxy) } else { fmt.Printf("%s[配置]%s 代理: 已禁用\n", colorGray, colorReset) } if web.IsEmbedded() { fmt.Printf("%s[前端]%s 嵌入模式\n", colorGreen, colorReset) } else { fmt.Printf("%s[前端]%s 开发模式 (未嵌入)\n", colorYellow, colorReset) } fmt.Println() // 启动自动补号服务 api.StartAutoAddService() fmt.Printf("%s[服务]%s 自动补号服务已启动\n", colorGreen, colorReset) // 启动服务器 startServer(cfg) } func startServer(cfg *config.Config) { mux := http.NewServeMux() // 基础 API mux.HandleFunc("/api/health", api.CORS(handleHealth)) mux.HandleFunc("/api/config", api.CORS(handleConfig)) // 日志 API mux.HandleFunc("/api/logs", api.CORS(handleGetLogs)) mux.HandleFunc("/api/logs/clear", api.CORS(handleClearLogs)) mux.HandleFunc("/api/logs/stream", handleLogStream) // SSE 实时日志 // S2A 代理 API mux.HandleFunc("/api/s2a/test", api.CORS(handleS2ATest)) mux.HandleFunc("/api/s2a/proxy/", api.CORS(handleS2AProxy)) // 通配代理 mux.HandleFunc("/api/s2a/clean-errors", api.CORS(api.HandleCleanErrorAccounts)) // 清理错误账号 // 邮箱服务 API mux.HandleFunc("/api/mail/services", api.CORS(handleMailServices)) mux.HandleFunc("/api/mail/services/test", api.CORS(handleTestMailService)) // Team Owner API mux.HandleFunc("/api/db/owners", api.CORS(handleGetOwners)) mux.HandleFunc("/api/db/owners/stats", api.CORS(handleGetOwnerStats)) mux.HandleFunc("/api/db/owners/clear", api.CORS(handleClearOwners)) mux.HandleFunc("/api/db/owners/refetch-account-ids", api.CORS(api.HandleRefetchAccountIDs)) mux.HandleFunc("/api/upload/validate", api.CORS(api.HandleUploadValidate)) // 注册测试 API mux.HandleFunc("/api/register/test", api.CORS(handleRegisterTest)) // Team 批量处理 API mux.HandleFunc("/api/team/process", api.CORS(api.HandleTeamProcess)) mux.HandleFunc("/api/team/status", api.CORS(api.HandleTeamProcessStatus)) mux.HandleFunc("/api/team/stop", api.CORS(api.HandleTeamProcessStop)) // 批次记录 API mux.HandleFunc("/api/batch/runs", api.CORS(handleBatchRuns)) mux.HandleFunc("/api/batch/stats", api.CORS(handleBatchStats)) // 监控设置 API mux.HandleFunc("/api/monitor/settings", api.CORS(api.HandleGetMonitorSettings)) mux.HandleFunc("/api/monitor/settings/save", api.CORS(api.HandleSaveMonitorSettings)) // 嵌入的前端静态文件 if web.IsEmbedded() { webFS := web.GetFileSystem() fileServer := http.FileServer(webFS) mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { // API 请求不处理 if strings.HasPrefix(r.URL.Path, "/api/") { http.NotFound(w, r) return } // SPA 路由:非静态资源返回 index.html path := r.URL.Path if path != "/" && !strings.Contains(path, ".") { r.URL.Path = "/" } fileServer.ServeHTTP(w, r) }) } addr := fmt.Sprintf(":%d", cfg.Port) // ANSI 颜色代码 colorReset := "\033[0m" colorGreen := "\033[32m" colorCyan := "\033[36m" // 显示访问地址 fmt.Printf("%s[服务]%s 启动于:\n", colorGreen, colorReset) fmt.Printf(" - 本地: %shttp://localhost:%d%s\n", colorCyan, cfg.Port, colorReset) if ip := getOutboundIP(); ip != "" { fmt.Printf(" - 外部: %shttp://%s:%d%s\n", colorCyan, ip, cfg.Port, colorReset) } fmt.Println() if err := http.ListenAndServe(addr, mux); err != nil { fmt.Printf("\033[31m[ERROR]\033[0m 服务启动失败: %v\n", err) os.Exit(1) } } // ==================== API 处理器 ==================== func handleHealth(w http.ResponseWriter, r *http.Request) { api.Success(w, map[string]string{"status": "ok"}) } func handleConfig(w http.ResponseWriter, r *http.Request) { switch r.Method { case http.MethodGet: // 获取配置 if config.Global == nil { api.Error(w, http.StatusInternalServerError, "配置未加载") return } api.Success(w, map[string]interface{}{ "port": config.Global.Port, "s2a_api_base": config.Global.S2AApiBase, "s2a_admin_key": config.Global.S2AAdminKey, "has_admin_key": config.Global.S2AAdminKey != "", "concurrency": config.Global.Concurrency, "priority": config.Global.Priority, "group_ids": config.Global.GroupIDs, "proxy_enabled": config.Global.ProxyEnabled, "default_proxy": config.Global.DefaultProxy, "mail_services_count": len(config.Global.MailServices), "mail_services": config.Global.MailServices, }) case http.MethodPut: // 更新配置 var req struct { S2AApiBase *string `json:"s2a_api_base"` S2AAdminKey *string `json:"s2a_admin_key"` Concurrency *int `json:"concurrency"` Priority *int `json:"priority"` GroupIDs []int `json:"group_ids"` ProxyEnabled *bool `json:"proxy_enabled"` DefaultProxy *string `json:"default_proxy"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { api.Error(w, http.StatusBadRequest, "请求格式错误") return } // 更新字段 if req.S2AApiBase != nil { config.Global.S2AApiBase = *req.S2AApiBase } if req.S2AAdminKey != nil { config.Global.S2AAdminKey = *req.S2AAdminKey } if req.Concurrency != nil { config.Global.Concurrency = *req.Concurrency } if req.Priority != nil { config.Global.Priority = *req.Priority } if req.GroupIDs != nil { config.Global.GroupIDs = req.GroupIDs } if req.ProxyEnabled != nil { config.Global.ProxyEnabled = *req.ProxyEnabled } if req.DefaultProxy != nil { config.Global.DefaultProxy = *req.DefaultProxy } // 保存到数据库 (实时生效) if err := config.Update(config.Global); err != nil { api.Error(w, http.StatusInternalServerError, fmt.Sprintf("保存配置失败: %v", err)) return } logger.Success("配置已更新并保存到数据库", "", "config") api.Success(w, map[string]string{"message": "配置已更新"}) default: api.Error(w, http.StatusMethodNotAllowed, "不支持的方法") } } func handleGetLogs(w http.ResponseWriter, r *http.Request) { logs := logger.GetLogs(100) api.Success(w, logs) } func handleClearLogs(w http.ResponseWriter, r *http.Request) { logger.ClearLogs() api.Success(w, map[string]string{"message": "日志已清空"}) } // handleBatchRuns 获取批次运行记录 func handleBatchRuns(w http.ResponseWriter, r *http.Request) { if database.Instance == nil { api.Error(w, http.StatusInternalServerError, "数据库未初始化") return } runs, err := database.Instance.GetBatchRuns(50) if err != nil { api.Error(w, http.StatusInternalServerError, fmt.Sprintf("获取记录失败: %v", err)) return } api.Success(w, runs) } // handleBatchStats 获取批次统计 func handleBatchStats(w http.ResponseWriter, r *http.Request) { if database.Instance == nil { api.Error(w, http.StatusInternalServerError, "数据库未初始化") return } stats := database.Instance.GetBatchRunStats() api.Success(w, stats) } // handleLogStream SSE 实时日志流 func handleLogStream(w http.ResponseWriter, r *http.Request) { // 设置 SSE 响应头 w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") // 生成唯一 ID listenerID := fmt.Sprintf("client-%d", time.Now().UnixNano()) // 订阅日志 logChan := logger.AddListener(listenerID) defer logger.RemoveListener(listenerID) // 获取 flusher flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "SSE not supported", http.StatusInternalServerError) return } // 发送初始连接确认 fmt.Fprintf(w, "data: {\"type\":\"connected\",\"id\":\"%s\"}\n\n", listenerID) flusher.Flush() // 监听日志和连接关闭 ctx := r.Context() for { select { case <-ctx.Done(): return case entry, ok := <-logChan: if !ok { return } // 发送日志条目 data, _ := json.Marshal(map[string]interface{}{ "type": "log", "timestamp": entry.Timestamp.Format("15:04:05"), "level": entry.Level, "message": entry.Message, "email": entry.Email, "module": entry.Module, }) fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() } } } func handleS2ATest(w http.ResponseWriter, r *http.Request) { if config.Global == nil || config.Global.S2AApiBase == "" { api.Error(w, http.StatusBadRequest, "S2A 配置未设置") return } // 简单测试连接 api.Success(w, map[string]interface{}{ "connected": true, "message": "S2A 配置已就绪", }) } // handleS2AProxy 代理 S2A API 请求 func handleS2AProxy(w http.ResponseWriter, r *http.Request) { if config.Global == nil || config.Global.S2AApiBase == "" || config.Global.S2AAdminKey == "" { api.Error(w, http.StatusBadRequest, "S2A 配置未设置") return } // 提取路径: /api/s2a/proxy/xxx -> 目标路径 path := strings.TrimPrefix(r.URL.Path, "/api/s2a/proxy") // 如果路径不是以 /api/ 开通的,默认补上 /api/v1/admin 开头(兼容 dashboard 统计等) // 如果已经是 /api/ 开头(如 /api/pool/polling),则保持原样 var targetPath string if strings.HasPrefix(path, "/api/") { targetPath = path } else { targetPath = "/api/v1/admin" + path } targetURL := config.Global.S2AApiBase + targetPath if r.URL.RawQuery != "" { targetURL += "?" + r.URL.RawQuery } logger.Info(fmt.Sprintf("S2A Proxy: %s", targetPath), "", "proxy") // 创建代理请求 proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body) if err != nil { api.Error(w, http.StatusInternalServerError, "创建请求失败") return } // 设置认证头 adminKey := config.Global.S2AAdminKey proxyReq.Header.Set("Authorization", "Bearer "+adminKey) proxyReq.Header.Set("X-API-Key", adminKey) proxyReq.Header.Set("X-Admin-Key", adminKey) // 可能是这个 proxyReq.Header.Set("Content-Type", "application/json") proxyReq.Header.Set("Accept", "application/json") // 发送请求 client := &http.Client{Timeout: 30 * time.Second} resp, err := client.Do(proxyReq) if err != nil { logger.Error(fmt.Sprintf("S2A 请求失败: %v", err), "", "proxy") api.Error(w, http.StatusBadGateway, fmt.Sprintf("请求 S2A 失败: %v", err)) return } defer resp.Body.Close() // 读取响应体 bodyBytes, err := io.ReadAll(resp.Body) if err != nil { logger.Error(fmt.Sprintf("读取响应失败: %v", err), "", "proxy") api.Error(w, http.StatusBadGateway, "读取响应失败") return } // 记录响应状态(仅显示状态码和长度) logger.Info(fmt.Sprintf("S2A 响应: status=%d, len=%d", resp.StatusCode, len(bodyBytes)), "", "proxy") // 复制响应头 for key, values := range resp.Header { for _, value := range values { w.Header().Add(key, value) } } // 复制响应状态和内容 w.WriteHeader(resp.StatusCode) w.Write(bodyBytes) } func handleMailServices(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": services := mail.GetServices() // 返回完整配置(包括 token)供前端加载 result := make([]map[string]interface{}, len(services)) for i, s := range services { result[i] = map[string]interface{}{ "name": s.Name, "apiBase": s.APIBase, "apiToken": s.APIToken, "domain": s.Domain, "emailPath": s.EmailPath, "addUserApi": s.AddUserAPI, } } api.Success(w, result) case "POST": var req struct { Services []struct { Name string `json:"name"` APIBase string `json:"apiBase"` APIToken string `json:"apiToken"` Domain string `json:"domain"` EmailPath string `json:"emailPath"` AddUserAPI string `json:"addUserApi"` } `json:"services"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { logger.Error(fmt.Sprintf("解析邮箱服务配置失败: %v", err), "", "mail") api.Error(w, http.StatusBadRequest, "解析请求失败") return } // 转换为 config.MailServiceConfig var services []config.MailServiceConfig for _, s := range req.Services { emailPath := s.EmailPath if emailPath == "" { emailPath = "/api/public/emailList" } addUserAPI := s.AddUserAPI if addUserAPI == "" { addUserAPI = "/api/public/addUser" } services = append(services, config.MailServiceConfig{ Name: s.Name, APIBase: s.APIBase, APIToken: s.APIToken, Domain: s.Domain, EmailPath: emailPath, AddUserAPI: addUserAPI, }) } // 更新邮箱服务配置(内存) mail.Init(services) // 保存到全局配置 if config.Global != nil { config.Global.MailServices = services } // 持久化到数据库 if database.Instance != nil { jsonData, _ := json.Marshal(services) if err := database.Instance.SetConfig("mail_services", string(jsonData)); err != nil { logger.Error(fmt.Sprintf("保存邮箱配置到数据库失败: %v", err), "", "mail") } } logger.Success(fmt.Sprintf("邮箱服务配置已保存: %d 个服务", len(services)), "", "mail") for _, s := range services { logger.Info(fmt.Sprintf(" - %s (%s) @ %s", s.Name, s.Domain, s.APIBase), "", "mail") } api.Success(w, map[string]interface{}{ "message": fmt.Sprintf("已保存 %d 个邮箱服务配置", len(services)), "count": len(services), }) default: api.Error(w, http.StatusMethodNotAllowed, "不支持的方法") } } func handleTestMailService(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } var req struct { Name string `json:"name"` APIBase string `json:"api_base"` APIToken string `json:"api_token"` Domain string `json:"domain"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { api.Error(w, http.StatusBadRequest, "解析请求失败") return } logger.Info(fmt.Sprintf("测试邮箱服务: %s (%s)", req.Name, req.Domain), "", "mail") logger.Info(fmt.Sprintf("API Base: %s", req.APIBase), "", "mail") // 创建测试请求 testEmail := "test@" + req.Domain payload := map[string]interface{}{ "list": []map[string]string{ {"email": testEmail, "password": "TestPass123#"}, }, } jsonData, _ := json.Marshal(payload) apiURL := req.APIBase + "/api/public/addUser" httpReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonData)) if err != nil { logger.Error(fmt.Sprintf("创建请求失败: %v", err), "", "mail") api.Error(w, http.StatusInternalServerError, "创建请求失败") return } httpReq.Header.Set("Authorization", req.APIToken) httpReq.Header.Set("Content-Type", "application/json") client := &http.Client{Timeout: 10 * time.Second} resp, err := client.Do(httpReq) if err != nil { logger.Error(fmt.Sprintf("请求失败: %v", err), "", "mail") api.Error(w, http.StatusBadGateway, fmt.Sprintf("连接失败: %v", err)) return } defer resp.Body.Close() bodyBytes, _ := io.ReadAll(resp.Body) bodyStr := string(bodyBytes) logger.Info(fmt.Sprintf("响应状态: %d", resp.StatusCode), "", "mail") logger.Info(fmt.Sprintf("响应内容: %s", bodyStr), "", "mail") // 解析响应 var result struct { Code int `json:"code"` Message string `json:"message"` } if err := json.Unmarshal(bodyBytes, &result); err != nil { logger.Error(fmt.Sprintf("解析响应失败: %v", err), "", "mail") api.Error(w, http.StatusInternalServerError, "解析响应失败") return } // 判断结果 // code 200 = 创建成功 // code 501 且消息包含"已存在" = 邮箱已存在,说明连接正常 isSuccess := result.Code == 200 || strings.Contains(result.Message, "exist") || strings.Contains(result.Message, "已存在") || (result.Code == 501 && strings.Contains(result.Message, "邮箱")) if isSuccess { logger.Success(fmt.Sprintf("邮箱服务测试成功: %s (邮箱已存在或创建成功)", req.Name), "", "mail") api.Success(w, map[string]interface{}{ "connected": true, "message": "邮箱服务连接成功", "response": result, }) } else { logger.Error(fmt.Sprintf("邮箱服务测试失败: %s - %s", req.Name, result.Message), "", "mail") api.Error(w, http.StatusBadRequest, fmt.Sprintf("API 错误: %s", result.Message)) } } func handleGetOwners(w http.ResponseWriter, r *http.Request) { if database.Instance == nil { api.Error(w, http.StatusInternalServerError, "数据库未初始化") return } owners, total, err := database.Instance.GetTeamOwners("", 50, 0) if err != nil { api.Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err)) return } api.Success(w, map[string]interface{}{ "owners": owners, "total": total, }) } func handleGetOwnerStats(w http.ResponseWriter, r *http.Request) { if database.Instance == nil { api.Error(w, http.StatusInternalServerError, "数据库未初始化") return } stats := database.Instance.GetOwnerStats() api.Success(w, stats) } func handleClearOwners(w http.ResponseWriter, r *http.Request) { if database.Instance == nil { api.Error(w, http.StatusInternalServerError, "数据库未初始化") return } if err := database.Instance.ClearTeamOwners(); err != nil { api.Error(w, http.StatusInternalServerError, fmt.Sprintf("清空失败: %v", err)) return } api.Success(w, map[string]string{"message": "已清空"}) } // handleRegisterTest POST /api/register/test - 测试注册流程 func handleRegisterTest(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST") return } var req struct { Proxy string `json:"proxy"` } json.NewDecoder(r.Body).Decode(&req) // 使用配置中的默认代理 proxy := req.Proxy if proxy == "" && config.Global != nil { proxy = config.Global.DefaultProxy } // 生成测试数据 email := mail.GenerateEmail() password := register.GeneratePassword() name := register.GenerateName() birthdate := register.GenerateBirthdate() logger.Info(fmt.Sprintf("开始注册测试: %s", email), email, "register") // 执行注册 reg, err := register.Run(email, password, name, birthdate, proxy) if err != nil { logger.Error(fmt.Sprintf("注册失败: %v", err), email, "register") api.Error(w, http.StatusInternalServerError, fmt.Sprintf("注册失败: %v", err)) return } logger.Success(fmt.Sprintf("注册成功: %s", email), email, "register") // 返回结果 api.Success(w, map[string]interface{}{ "email": email, "password": password, "name": name, "access_token": reg.AccessToken, }) } // getOutboundIP 获取本机出口 IP func getOutboundIP() string { // 方法1: 通过连接获取 conn, err := net.Dial("udp", "8.8.8.8:80") if err == nil { defer conn.Close() localAddr := conn.LocalAddr().(*net.UDPAddr) return localAddr.IP.String() } // 方法2: 遍历网卡 addrs, err := net.InterfaceAddrs() if err != nil { return "" } for _, addr := range addrs { if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() { if ipnet.IP.To4() != nil { return ipnet.IP.String() } } } return "" }