Files
codexautopool/backend/cmd/main.go

1163 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package main
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"codex-pool/internal/api"
"codex-pool/internal/config"
"codex-pool/internal/database"
"codex-pool/internal/logger"
"codex-pool/internal/mail"
"codex-pool/internal/register"
"codex-pool/internal/web"
)
func main() {
// ANSI 颜色代码
colorReset := "\033[0m"
colorCyan := "\033[36m"
colorGreen := "\033[32m"
colorYellow := "\033[33m"
colorGray := "\033[90m"
colorBold := "\033[1m"
fmt.Printf("%s%s============================================================%s\n", colorBold, colorCyan, colorReset)
fmt.Printf("%s%s Codex Pool - HTTP API Server%s\n", colorBold, colorCyan, colorReset)
fmt.Printf("%s%s============================================================%s\n\n", colorBold, colorCyan, colorReset)
// 确定数据目录
dataDir := "data"
// 确保数据目录存在
if err := os.MkdirAll(dataDir, 0755); err != nil {
fmt.Printf("%s[WARN]%s 创建数据目录失败: %v, 使用当前目录\n", colorYellow, colorReset, err)
dataDir = "."
}
// 初始化数据库 (先于配置)
dbPath := filepath.Join(dataDir, "codex-pool.db")
if err := database.Init(dbPath); err != nil {
fmt.Printf("%s[ERROR]%s 数据库初始化失败: %v\n", "\033[31m", colorReset, err)
os.Exit(1)
}
// 设置配置数据库并加载配置
config.SetConfigDB(database.Instance)
cfg := config.InitFromDB()
// 初始化邮箱服务
if len(cfg.MailServices) > 0 {
mail.Init(cfg.MailServices)
fmt.Printf("%s[邮箱]%s 已加载 %d 个邮箱服务\n", colorGreen, colorReset, len(cfg.MailServices))
}
fmt.Printf("%s[配置]%s 数据库: %s\n", colorGray, colorReset, dbPath)
fmt.Printf("%s[配置]%s 端口: %d\n", colorGray, colorReset, cfg.Port)
if cfg.S2AApiBase != "" {
fmt.Printf("%s[配置]%s S2A API: %s\n", colorGray, colorReset, cfg.S2AApiBase)
} else {
fmt.Printf("%s[配置]%s S2A API: %s未配置%s (请在Web界面配置)\n", colorGray, colorReset, colorYellow, colorReset)
}
if cfg.ProxyEnabled {
fmt.Printf("%s[配置]%s 代理: %s (已启用)\n", colorGray, colorReset, cfg.DefaultProxy)
} else {
fmt.Printf("%s[配置]%s 代理: 已禁用\n", colorGray, colorReset)
}
if web.IsEmbedded() {
fmt.Printf("%s[前端]%s 嵌入模式\n", colorGreen, colorReset)
} else {
fmt.Printf("%s[前端]%s 开发模式 (未嵌入)\n", colorYellow, colorReset)
}
fmt.Println()
// 启动自动补号检查器(需在前端开启开关才会实际补号)
api.StartAutoAddService()
// 启动错误账号定期清理服务(需在配置中启用)
api.StartErrorCleanerService()
// 启动母号封禁检查服务(需在配置中启用)
api.StartBanCheckService()
// 启动服务器
startServer(cfg)
}
func startServer(cfg *config.Config) {
mux := http.NewServeMux()
// 基础 API
mux.HandleFunc("/api/health", api.CORS(handleHealth))
mux.HandleFunc("/api/config", api.CORS(handleConfig))
mux.HandleFunc("/api/proxy/test", api.CORS(handleProxyTest)) // 代理测试
// 日志 API
mux.HandleFunc("/api/logs", api.CORS(handleGetLogs))
mux.HandleFunc("/api/logs/clear", api.CORS(handleClearLogs))
mux.HandleFunc("/api/logs/clear-module", api.CORS(handleClearLogsByModule)) // 按模块清除日志
mux.HandleFunc("/api/logs/query", api.CORS(handleQueryLogs)) // 按模块查询日志
mux.HandleFunc("/api/logs/stream", handleLogStream) // SSE 实时日志
// S2A 代理 API
mux.HandleFunc("/api/s2a/test", api.CORS(handleS2ATest))
mux.HandleFunc("/api/s2a/proxy/", api.CORS(handleS2AProxy)) // 通配代理
mux.HandleFunc("/api/s2a/clean-errors", api.CORS(api.HandleCleanErrorAccounts)) // 清理错误账号
mux.HandleFunc("/api/s2a/cleaner/settings", api.CORS(handleCleanerSettings)) // 清理服务设置
// 邮箱服务 API
mux.HandleFunc("/api/mail/services", api.CORS(handleMailServices))
mux.HandleFunc("/api/mail/services/test", api.CORS(handleTestMailService))
// Team Owner API
mux.HandleFunc("/api/db/owners", api.CORS(handleGetOwners))
mux.HandleFunc("/api/db/owners/stats", api.CORS(handleGetOwnerStats))
mux.HandleFunc("/api/db/owners/clear", api.CORS(handleClearOwners))
mux.HandleFunc("/api/db/owners/clear-used", api.CORS(handleClearUsedOwners)) // 清理已使用
mux.HandleFunc("/api/db/owners/delete/", api.CORS(handleDeleteOwner)) // DELETE /api/db/owners/delete/{id}
mux.HandleFunc("/api/db/owners/batch-delete", api.CORS(handleBatchDeleteOwners)) // POST 批量删除
mux.HandleFunc("/api/db/owners/refetch-account-ids", api.CORS(api.HandleRefetchAccountIDs))
mux.HandleFunc("/api/upload/validate", api.CORS(api.HandleUploadValidate))
// 母号封禁检查 API
mux.HandleFunc("/api/db/owners/ban-check", api.CORS(api.HandleManualBanCheck)) // 手动触发检查
mux.HandleFunc("/api/db/owners/ban-check/status", api.CORS(api.HandleBanCheckStatus)) // 检查状态
mux.HandleFunc("/api/db/owners/ban-check/settings", api.CORS(api.HandleBanCheckSettings)) // 配置
// 注册测试 API
mux.HandleFunc("/api/register/test", api.CORS(handleRegisterTest))
// Team 批量处理 API
mux.HandleFunc("/api/team/process", api.CORS(api.HandleTeamProcess))
mux.HandleFunc("/api/team/status", api.CORS(api.HandleTeamProcessStatus))
mux.HandleFunc("/api/team/stop", api.CORS(api.HandleTeamProcessStop))
// 批次历史 API分页 + 详情)
mux.HandleFunc("/api/batch/history", api.CORS(api.HandleBatchHistory))
mux.HandleFunc("/api/batch/detail", api.CORS(api.HandleBatchDetail))
// 批次记录 API
mux.HandleFunc("/api/batch/runs", api.CORS(handleBatchRuns))
mux.HandleFunc("/api/batch/stats", api.CORS(handleBatchStats))
mux.HandleFunc("/api/batch/cleanup", api.CORS(handleBatchCleanup))
// 监控设置 API
mux.HandleFunc("/api/monitor/settings", api.CORS(api.HandleGetMonitorSettings))
mux.HandleFunc("/api/monitor/settings/save", api.CORS(api.HandleSaveMonitorSettings))
// Team-Reg 自动注册 API
mux.HandleFunc("/api/team-reg/start", api.CORS(api.HandleTeamRegStart))
mux.HandleFunc("/api/team-reg/stop", api.CORS(api.HandleTeamRegStop))
mux.HandleFunc("/api/team-reg/status", api.CORS(api.HandleTeamRegStatus))
mux.HandleFunc("/api/team-reg/logs", api.HandleTeamRegLogs) // SSE
mux.HandleFunc("/api/team-reg/import", api.CORS(api.HandleTeamRegImport))
mux.HandleFunc("/api/team-reg/clear-logs", api.CORS(api.HandleTeamRegClearLogs))
// CodexAuth 代理池 API
mux.HandleFunc("/api/codex-proxy", api.CORS(api.HandleCodexProxies))
mux.HandleFunc("/api/codex-proxy/test", api.CORS(api.HandleCodexProxies))
// 嵌入的前端静态文件
if web.IsEmbedded() {
webFS := web.GetFileSystem()
fileServer := http.FileServer(webFS)
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// API 请求不处理
if strings.HasPrefix(r.URL.Path, "/api/") {
http.NotFound(w, r)
return
}
// SPA 路由:非静态资源返回 index.html
path := r.URL.Path
if path != "/" && !strings.Contains(path, ".") {
r.URL.Path = "/"
}
fileServer.ServeHTTP(w, r)
})
}
addr := fmt.Sprintf(":%d", cfg.Port)
// ANSI 颜色代码
colorReset := "\033[0m"
colorGreen := "\033[32m"
colorCyan := "\033[36m"
// 显示访问地址
fmt.Printf("%s[服务]%s 启动于:\n", colorGreen, colorReset)
fmt.Printf(" - 本地: %shttp://localhost:%d%s\n", colorCyan, cfg.Port, colorReset)
if ip := getOutboundIP(); ip != "" {
fmt.Printf(" - 外部: %shttp://%s:%d%s\n", colorCyan, ip, cfg.Port, colorReset)
}
fmt.Println()
if err := http.ListenAndServe(addr, mux); err != nil {
fmt.Printf("\033[31m[ERROR]\033[0m 服务启动失败: %v\n", err)
os.Exit(1)
}
}
// ==================== API 处理器 ====================
func handleHealth(w http.ResponseWriter, r *http.Request) {
api.Success(w, map[string]string{"status": "ok"})
}
func handleConfig(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
// 获取配置
if config.Global == nil {
api.Error(w, http.StatusInternalServerError, "配置未加载")
return
}
api.Success(w, map[string]interface{}{
"port": config.Global.Port,
"s2a_api_base": config.Global.S2AApiBase,
"s2a_admin_key": config.Global.S2AAdminKey,
"has_admin_key": config.Global.S2AAdminKey != "",
"concurrency": config.Global.Concurrency,
"priority": config.Global.Priority,
"group_ids": config.Global.GroupIDs,
"proxy_enabled": config.Global.ProxyEnabled,
"default_proxy": config.Global.DefaultProxy,
"team_reg_proxy": config.Global.TeamRegProxy,
"proxy_test_status": getProxyTestStatus(),
"proxy_test_ip": getProxyTestIP(),
"team_reg_proxy_test_status": getTeamRegProxyTestStatus(),
"team_reg_proxy_test_ip": getTeamRegProxyTestIP(),
"site_name": config.Global.SiteName,
"auth_method": config.Global.AuthMethod,
"mail_services_count": len(config.Global.MailServices),
"mail_services": config.Global.MailServices,
})
case http.MethodPut:
// 更新配置
var req struct {
S2AApiBase *string `json:"s2a_api_base"`
S2AAdminKey *string `json:"s2a_admin_key"`
Concurrency *int `json:"concurrency"`
Priority *int `json:"priority"`
GroupIDs []int `json:"group_ids"`
ProxyEnabled *bool `json:"proxy_enabled"`
DefaultProxy *string `json:"default_proxy"`
TeamRegProxy *string `json:"team_reg_proxy"`
SiteName *string `json:"site_name"`
AuthMethod *string `json:"auth_method"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.Error(w, http.StatusBadRequest, "请求格式错误")
return
}
// 更新字段
if req.S2AApiBase != nil {
config.Global.S2AApiBase = *req.S2AApiBase
}
if req.S2AAdminKey != nil {
config.Global.S2AAdminKey = *req.S2AAdminKey
}
if req.Concurrency != nil {
config.Global.Concurrency = *req.Concurrency
}
if req.Priority != nil {
config.Global.Priority = *req.Priority
}
if req.GroupIDs != nil {
config.Global.GroupIDs = req.GroupIDs
}
if req.ProxyEnabled != nil {
config.Global.ProxyEnabled = *req.ProxyEnabled
}
if req.DefaultProxy != nil {
config.Global.DefaultProxy = *req.DefaultProxy
// 代理地址变更时,重置测试状态
if database.Instance != nil {
database.Instance.SetConfig("proxy_test_status", "unknown")
database.Instance.SetConfig("proxy_test_ip", "")
}
}
if req.TeamRegProxy != nil {
config.Global.TeamRegProxy = *req.TeamRegProxy
}
if req.SiteName != nil {
config.Global.SiteName = *req.SiteName
}
if req.AuthMethod != nil {
config.Global.AuthMethod = *req.AuthMethod
}
// 保存到数据库 (实时生效)
if err := config.Update(config.Global); err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("保存配置失败: %v", err))
return
}
logger.Success("配置已更新并保存到数据库", "", "config")
api.Success(w, map[string]string{"message": "配置已更新"})
default:
api.Error(w, http.StatusMethodNotAllowed, "不支持的方法")
}
}
func handleGetLogs(w http.ResponseWriter, r *http.Request) {
logs := logger.GetLogs(200)
api.Success(w, logs)
}
func handleClearLogs(w http.ResponseWriter, r *http.Request) {
logger.ClearLogs()
api.Success(w, map[string]string{"message": "日志已清空"})
}
// handleClearLogsByModule POST /api/logs/clear-module?module=cleaner
func handleClearLogsByModule(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
module := r.URL.Query().Get("module")
if module == "" {
api.Error(w, http.StatusBadRequest, "缺少 module 参数")
return
}
cleared := logger.ClearLogsByModule(module)
api.Success(w, map[string]interface{}{
"message": fmt.Sprintf("已清除 %d 条 %s 日志", cleared, module),
"cleared": cleared,
})
}
// handleQueryLogs GET /api/logs/query?module=cleaner&page=1&page_size=5&level=success
func handleQueryLogs(w http.ResponseWriter, r *http.Request) {
module := r.URL.Query().Get("module")
if module == "" {
api.Error(w, http.StatusBadRequest, "缺少 module 参数")
return
}
page := 1
pageSize := 5
level := r.URL.Query().Get("level") // 可选的日志级别过滤
if v := r.URL.Query().Get("page"); v != "" {
if p, err := strconv.Atoi(v); err == nil && p > 0 {
page = p
}
}
if v := r.URL.Query().Get("page_size"); v != "" {
if ps, err := strconv.Atoi(v); err == nil && ps > 0 && ps <= 100 {
pageSize = ps
}
}
// 如果指定了 level使用带级别过滤的函数
var entries []logger.LogEntry
var total int
if level != "" {
entries, total = logger.GetLogsByModuleAndLevel(module, level, page, pageSize)
} else {
entries, total = logger.GetLogsByModule(module, page, pageSize)
}
totalPages := (total + pageSize - 1) / pageSize
api.Success(w, map[string]interface{}{
"logs": entries,
"total": total,
"page": page,
"page_size": pageSize,
"total_pages": totalPages,
})
}
// handleBatchRuns 获取批次运行记录
func handleBatchRuns(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
runs, err := database.Instance.GetBatchRuns(50)
if err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("获取记录失败: %v", err))
return
}
api.Success(w, runs)
}
// handleBatchStats 获取批次统计
func handleBatchStats(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
stats := database.Instance.GetBatchRunStats()
api.Success(w, stats)
}
// handleBatchCleanup 清理卡住的 running 状态批次记录
func handleBatchCleanup(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
affected, err := database.Instance.CleanupStuckBatchRuns()
if err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("清理失败: %v", err))
return
}
api.Success(w, map[string]interface{}{
"message": "清理完成",
"affected": affected,
})
}
// handleLogStream SSE 实时日志流
func handleLogStream(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 生成唯一 ID
listenerID := fmt.Sprintf("client-%d", time.Now().UnixNano())
// 订阅日志
logChan := logger.AddListener(listenerID)
defer logger.RemoveListener(listenerID)
// 获取 flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// 发送初始连接确认
fmt.Fprintf(w, "data: {\"type\":\"connected\",\"id\":\"%s\"}\n\n", listenerID)
flusher.Flush()
// 监听日志和连接关闭
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case entry, ok := <-logChan:
if !ok {
return
}
// 发送日志条目
data, _ := json.Marshal(map[string]interface{}{
"type": "log",
"timestamp": entry.Timestamp.Format("15:04:05"),
"level": entry.Level,
"message": entry.Message,
"email": entry.Email,
"module": entry.Module,
})
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
}
}
func handleS2ATest(w http.ResponseWriter, r *http.Request) {
if config.Global == nil || config.Global.S2AApiBase == "" {
api.Error(w, http.StatusBadRequest, "S2A 配置未设置")
return
}
// 简单测试连接
api.Success(w, map[string]interface{}{
"connected": true,
"message": "S2A 配置已就绪",
})
}
// handleS2AProxy 代理 S2A API 请求
func handleS2AProxy(w http.ResponseWriter, r *http.Request) {
if config.Global == nil || config.Global.S2AApiBase == "" || config.Global.S2AAdminKey == "" {
api.Error(w, http.StatusBadRequest, "S2A 配置未设置")
return
}
// 提取路径: /api/s2a/proxy/xxx -> 目标路径
path := strings.TrimPrefix(r.URL.Path, "/api/s2a/proxy")
// 如果路径不是以 /api/ 开通的,默认补上 /api/v1/admin 开头(兼容 dashboard 统计等)
// 如果已经是 /api/ 开头(如 /api/pool/polling则保持原样
var targetPath string
if strings.HasPrefix(path, "/api/") {
targetPath = path
} else {
targetPath = "/api/v1/admin" + path
}
targetURL := config.Global.S2AApiBase + targetPath
if r.URL.RawQuery != "" {
targetURL += "?" + r.URL.RawQuery
}
logger.Info(fmt.Sprintf("S2A Proxy: %s", targetPath), "", "proxy")
// 创建代理请求
proxyReq, err := http.NewRequest(r.Method, targetURL, r.Body)
if err != nil {
api.Error(w, http.StatusInternalServerError, "创建请求失败")
return
}
// 设置认证头
adminKey := config.Global.S2AAdminKey
proxyReq.Header.Set("Authorization", "Bearer "+adminKey)
proxyReq.Header.Set("X-API-Key", adminKey)
proxyReq.Header.Set("X-Admin-Key", adminKey) // 可能是这个
proxyReq.Header.Set("Content-Type", "application/json")
proxyReq.Header.Set("Accept", "application/json")
// 发送请求
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(proxyReq)
if err != nil {
logger.Error(fmt.Sprintf("S2A 请求失败: %v", err), "", "proxy")
api.Error(w, http.StatusBadGateway, fmt.Sprintf("请求 S2A 失败: %v", err))
return
}
defer resp.Body.Close()
// 读取响应体
bodyBytes, err := io.ReadAll(resp.Body)
if err != nil {
logger.Error(fmt.Sprintf("读取响应失败: %v", err), "", "proxy")
api.Error(w, http.StatusBadGateway, "读取响应失败")
return
}
// 记录响应状态(仅显示状态码和长度)
logger.Info(fmt.Sprintf("S2A 响应: status=%d, len=%d", resp.StatusCode, len(bodyBytes)), "", "proxy")
// 复制响应头
for key, values := range resp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
// 复制响应状态和内容
w.WriteHeader(resp.StatusCode)
w.Write(bodyBytes)
}
func handleMailServices(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
services := mail.GetServices()
// 返回完整配置(包括 token供前端加载
result := make([]map[string]interface{}, len(services))
for i, s := range services {
result[i] = map[string]interface{}{
"name": s.Name,
"apiBase": s.APIBase,
"apiToken": s.APIToken,
"domain": s.Domain,
"emailPath": s.EmailPath,
"addUserApi": s.AddUserAPI,
}
}
api.Success(w, result)
case "POST":
var req struct {
Services []struct {
Name string `json:"name"`
APIBase string `json:"apiBase"`
APIToken string `json:"apiToken"`
Domain string `json:"domain"`
EmailPath string `json:"emailPath"`
AddUserAPI string `json:"addUserApi"`
} `json:"services"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
logger.Error(fmt.Sprintf("解析邮箱服务配置失败: %v", err), "", "mail")
api.Error(w, http.StatusBadRequest, "解析请求失败")
return
}
// 转换为 config.MailServiceConfig
var services []config.MailServiceConfig
for _, s := range req.Services {
emailPath := s.EmailPath
if emailPath == "" {
emailPath = "/api/public/emailList"
}
addUserAPI := s.AddUserAPI
if addUserAPI == "" {
addUserAPI = "/api/public/addUser"
}
services = append(services, config.MailServiceConfig{
Name: s.Name,
APIBase: s.APIBase,
APIToken: s.APIToken,
Domain: s.Domain,
EmailPath: emailPath,
AddUserAPI: addUserAPI,
})
}
// 更新邮箱服务配置(内存)
mail.Init(services)
// 保存到全局配置
if config.Global != nil {
config.Global.MailServices = services
}
// 持久化到数据库
if database.Instance != nil {
jsonData, _ := json.Marshal(services)
if err := database.Instance.SetConfig("mail_services", string(jsonData)); err != nil {
logger.Error(fmt.Sprintf("保存邮箱配置到数据库失败: %v", err), "", "mail")
}
}
logger.Success(fmt.Sprintf("邮箱服务配置已保存: %d 个服务", len(services)), "", "mail")
for _, s := range services {
logger.Info(fmt.Sprintf(" - %s (%s) @ %s", s.Name, s.Domain, s.APIBase), "", "mail")
}
api.Success(w, map[string]interface{}{
"message": fmt.Sprintf("已保存 %d 个邮箱服务配置", len(services)),
"count": len(services),
})
default:
api.Error(w, http.StatusMethodNotAllowed, "不支持的方法")
}
}
func handleTestMailService(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
var req struct {
Name string `json:"name"`
APIBase string `json:"api_base"`
APIToken string `json:"api_token"`
Domain string `json:"domain"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.Error(w, http.StatusBadRequest, "解析请求失败")
return
}
logger.Info(fmt.Sprintf("测试邮箱服务: %s (%s)", req.Name, req.Domain), "", "mail")
logger.Info(fmt.Sprintf("API Base: %s", req.APIBase), "", "mail")
// 创建测试请求
testEmail := "test@" + req.Domain
payload := map[string]interface{}{
"list": []map[string]string{
{"email": testEmail, "password": "TestPass123#"},
},
}
jsonData, _ := json.Marshal(payload)
apiURL := req.APIBase + "/api/public/addUser"
httpReq, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonData))
if err != nil {
logger.Error(fmt.Sprintf("创建请求失败: %v", err), "", "mail")
api.Error(w, http.StatusInternalServerError, "创建请求失败")
return
}
httpReq.Header.Set("Authorization", req.APIToken)
httpReq.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(httpReq)
if err != nil {
logger.Error(fmt.Sprintf("请求失败: %v", err), "", "mail")
api.Error(w, http.StatusBadGateway, fmt.Sprintf("连接失败: %v", err))
return
}
defer resp.Body.Close()
bodyBytes, _ := io.ReadAll(resp.Body)
bodyStr := string(bodyBytes)
logger.Info(fmt.Sprintf("响应状态: %d", resp.StatusCode), "", "mail")
logger.Info(fmt.Sprintf("响应内容: %s", bodyStr), "", "mail")
// 解析响应
var result struct {
Code int `json:"code"`
Message string `json:"message"`
}
if err := json.Unmarshal(bodyBytes, &result); err != nil {
logger.Error(fmt.Sprintf("解析响应失败: %v", err), "", "mail")
api.Error(w, http.StatusInternalServerError, "解析响应失败")
return
}
// 判断结果
// code 200 = 创建成功
// code 501 且消息包含"已存在" = 邮箱已存在,说明连接正常
isSuccess := result.Code == 200 ||
strings.Contains(result.Message, "exist") ||
strings.Contains(result.Message, "已存在") ||
(result.Code == 501 && strings.Contains(result.Message, "邮箱"))
if isSuccess {
logger.Success(fmt.Sprintf("邮箱服务测试成功: %s (邮箱已存在或创建成功)", req.Name), "", "mail")
api.Success(w, map[string]interface{}{
"connected": true,
"message": "邮箱服务连接成功",
"response": result,
})
} else {
logger.Error(fmt.Sprintf("邮箱服务测试失败: %s - %s", req.Name, result.Message), "", "mail")
api.Error(w, http.StatusBadRequest, fmt.Sprintf("API 错误: %s", result.Message))
}
}
func handleGetOwners(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
// 读取分页参数
query := r.URL.Query()
limit := 20
offset := 0
status := query.Get("status")
if l := query.Get("limit"); l != "" {
if parsed, err := strconv.Atoi(l); err == nil && parsed > 0 {
limit = parsed
}
}
if o := query.Get("offset"); o != "" {
if parsed, err := strconv.Atoi(o); err == nil && parsed >= 0 {
offset = parsed
}
}
owners, total, err := database.Instance.GetTeamOwners(status, limit, offset)
if err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
return
}
api.Success(w, map[string]interface{}{
"owners": owners,
"total": total,
})
}
func handleGetOwnerStats(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
stats := database.Instance.GetOwnerStats()
api.Success(w, stats)
}
func handleClearOwners(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
if err := database.Instance.ClearTeamOwners(); err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("清空失败: %v", err))
return
}
api.Success(w, map[string]string{"message": "已清空"})
}
// handleClearUsedOwners POST /api/db/owners/clear-used - 清理已使用的母号
func handleClearUsedOwners(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
deleted, err := database.Instance.ClearUsedOwners()
if err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("清理失败: %v", err))
return
}
api.Success(w, map[string]interface{}{
"message": fmt.Sprintf("已清理 %d 个已使用的母号", deleted),
"deleted": deleted,
})
}
// handleDeleteOwner DELETE /api/db/owners/delete/{id} - 删除单个 owner
func handleDeleteOwner(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodDelete && r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 DELETE/POST")
return
}
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
// 从 URL 提取 ID: /api/db/owners/delete/{id}
path := strings.TrimPrefix(r.URL.Path, "/api/db/owners/delete/")
id, err := strconv.Atoi(path)
if err != nil {
api.Error(w, http.StatusBadRequest, "无效的 ID")
return
}
if err := database.Instance.DeleteTeamOwner(int64(id)); err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("删除失败: %v", err))
return
}
api.Success(w, map[string]string{"message": "已删除"})
}
// handleBatchDeleteOwners POST /api/db/owners/batch-delete - 批量删除 owners
func handleBatchDeleteOwners(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
var req struct {
IDs []int `json:"ids"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.Error(w, http.StatusBadRequest, "请求格式错误")
return
}
if len(req.IDs) == 0 {
api.Error(w, http.StatusBadRequest, "请选择要删除的账号")
return
}
deleted := 0
for _, id := range req.IDs {
if err := database.Instance.DeleteTeamOwner(int64(id)); err == nil {
deleted++
}
}
api.Success(w, map[string]interface{}{
"message": fmt.Sprintf("已删除 %d 个账号", deleted),
"deleted": deleted,
})
}
// handleRegisterTest POST /api/register/test - 测试注册流程
func handleRegisterTest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
var req struct {
Proxy string `json:"proxy"`
}
json.NewDecoder(r.Body).Decode(&req)
// 使用配置中的默认代理
proxy := req.Proxy
if proxy == "" && config.Global != nil {
proxy = config.Global.DefaultProxy
}
// 生成测试数据
email := mail.GenerateEmail()
password := register.GeneratePassword()
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
logger.Info(fmt.Sprintf("开始注册测试: %s", email), email, "register")
// 执行注册
reg, err := register.Run(email, password, name, birthdate, proxy)
if err != nil {
logger.Error(fmt.Sprintf("注册失败: %v", err), email, "register")
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("注册失败: %v", err))
return
}
logger.Success(fmt.Sprintf("注册成功: %s", email), email, "register")
// 返回结果
api.Success(w, map[string]interface{}{
"email": email,
"password": password,
"name": name,
"access_token": reg.AccessToken,
})
}
// getOutboundIP 获取本机出口 IP
func getOutboundIP() string {
// 方法1: 通过连接获取
conn, err := net.Dial("udp", "8.8.8.8:80")
if err == nil {
defer conn.Close()
localAddr := conn.LocalAddr().(*net.UDPAddr)
return localAddr.IP.String()
}
// 方法2: 遍历网卡
addrs, err := net.InterfaceAddrs()
if err != nil {
return ""
}
for _, addr := range addrs {
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
return ipnet.IP.String()
}
}
}
return ""
}
// handleCleanerSettings GET/POST /api/s2a/cleaner/settings - 获取/保存清理服务设置
func handleCleanerSettings(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
switch r.Method {
case http.MethodGet:
// 获取清理设置
enabled := false
interval := 3600 // 默认 1 小时
if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" {
enabled = true
}
if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
interval = v
}
}
api.Success(w, map[string]interface{}{
"enabled": enabled,
"interval": interval,
"status": api.GetCleanerStatus(),
})
case http.MethodPost:
// 保存清理设置
var req struct {
Enabled *bool `json:"enabled"`
Interval *int `json:"interval"`
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.Error(w, http.StatusBadRequest, "请求格式错误")
return
}
if req.Enabled != nil {
database.Instance.SetConfig("error_clean_enabled", strconv.FormatBool(*req.Enabled))
if *req.Enabled {
logger.Success("定期清理错误账号已启用", "", "cleaner")
} else {
logger.Info("定期清理错误账号已禁用", "", "cleaner")
}
}
if req.Interval != nil && *req.Interval >= 60 {
database.Instance.SetConfig("error_clean_interval", strconv.Itoa(*req.Interval))
logger.Info(fmt.Sprintf("清理间隔已设置为 %d 秒", *req.Interval), "", "cleaner")
}
api.Success(w, map[string]string{"message": "清理设置已保存"})
default:
api.Error(w, http.StatusMethodNotAllowed, "不支持的方法")
}
}
// handleProxyTest POST /api/proxy/test - 测试代理连接
func handleProxyTest(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
api.Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
var req struct {
ProxyURL string `json:"proxy_url"`
ProxyType string `json:"proxy_type"` // "default" 或 "team_reg"
}
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
api.Error(w, http.StatusBadRequest, "请求格式错误")
return
}
proxyURL := req.ProxyURL
if proxyURL == "" {
api.Error(w, http.StatusBadRequest, "代理地址不能为空")
return
}
// 确定保存状态的 key 前缀
statusKey := "proxy_test_status"
ipKey := "proxy_test_ip"
if req.ProxyType == "team_reg" {
statusKey = "team_reg_proxy_test_status"
ipKey = "team_reg_proxy_test_ip"
}
logger.Info(fmt.Sprintf("测试代理连接: %s", proxyURL), "", "proxy")
// 解析代理 URL
proxyParsed, err := parseProxyURL(proxyURL)
if err != nil {
logger.Error(fmt.Sprintf("代理地址格式错误: %v", err), "", "proxy")
api.Error(w, http.StatusBadRequest, fmt.Sprintf("代理地址格式错误: %v", err))
return
}
// 创建带代理的 HTTP 客户端
client := &http.Client{
Timeout: 15 * time.Second,
Transport: &http.Transport{
Proxy: http.ProxyURL(proxyParsed),
},
}
// 测试请求 - 使用 httpbin.org/ip 获取出口 IP
testURL := "https://httpbin.org/ip"
resp, err := client.Get(testURL)
if err != nil {
logger.Error(fmt.Sprintf("代理连接失败: %v", err), "", "proxy")
api.Error(w, http.StatusBadGateway, fmt.Sprintf("代理连接失败: %v", err))
return
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
logger.Error(fmt.Sprintf("代理测试失败: HTTP %d", resp.StatusCode), "", "proxy")
// 保存失败状态
if database.Instance != nil {
database.Instance.SetConfig(statusKey, "error")
database.Instance.SetConfig(ipKey, "")
}
api.Error(w, http.StatusBadGateway, fmt.Sprintf("代理测试失败: HTTP %d", resp.StatusCode))
return
}
// 解析响应获取出口 IP
var ipResp struct {
Origin string `json:"origin"`
}
if err := json.NewDecoder(resp.Body).Decode(&ipResp); err != nil {
logger.Warning(fmt.Sprintf("解析代理响应失败: %v", err), "", "proxy")
}
logger.Success(fmt.Sprintf("代理连接成功, 出口IP: %s", ipResp.Origin), "", "proxy")
// 保存成功状态到数据库
if database.Instance != nil {
database.Instance.SetConfig(statusKey, "success")
database.Instance.SetConfig(ipKey, ipResp.Origin)
}
api.Success(w, map[string]interface{}{
"connected": true,
"message": "代理连接成功",
"origin_ip": ipResp.Origin,
})
}
// getProxyTestStatus 获取代理测试状态
func getProxyTestStatus() string {
if database.Instance == nil {
return "unknown"
}
if val, _ := database.Instance.GetConfig("proxy_test_status"); val != "" {
return val
}
return "unknown"
}
// getProxyTestIP 获取代理测试出口IP
func getProxyTestIP() string {
if database.Instance == nil {
return ""
}
val, _ := database.Instance.GetConfig("proxy_test_ip")
return val
}
// getTeamRegProxyTestStatus 获取注册代理测试状态
func getTeamRegProxyTestStatus() string {
if database.Instance == nil {
return "unknown"
}
if val, _ := database.Instance.GetConfig("team_reg_proxy_test_status"); val != "" {
return val
}
return "unknown"
}
// getTeamRegProxyTestIP 获取注册代理测试出口IP
func getTeamRegProxyTestIP() string {
if database.Instance == nil {
return ""
}
val, _ := database.Instance.GetConfig("team_reg_proxy_test_ip")
return val
}
// parseProxyURL 解析代理 URL
func parseProxyURL(proxyURL string) (*url.URL, error) {
// 如果没有协议前缀,默认添加 http://
if !strings.HasPrefix(proxyURL, "http://") && !strings.HasPrefix(proxyURL, "https://") && !strings.HasPrefix(proxyURL, "socks5://") {
proxyURL = "http://" + proxyURL
}
return url.Parse(proxyURL)
}