package api import ( "bufio" "context" "encoding/json" "fmt" "io" "net/http" "os" "os/exec" "path/filepath" "regexp" "runtime" "sort" "strings" "sync" "time" "codex-pool/internal/database" "codex-pool/internal/logger" ) // ansiRegex 匹配 ANSI 转义序列(颜色码等) var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`) // TeamRegConfig 注册配置 type TeamRegConfig struct { Count int `json:"count"` // 注册数量 Concurrency int `json:"concurrency"` // 并发线程数 Proxy string `json:"proxy"` // 代理地址 AutoImport bool `json:"auto_import"` // 完成后自动导入 } // TeamRegState 运行状态 type TeamRegState struct { Running bool `json:"running"` StartedAt time.Time `json:"started_at"` Config TeamRegConfig `json:"config"` Logs []string `json:"logs"` OutputFile string `json:"output_file"` // 生成的 JSON 文件 Imported int `json:"imported"` // 已导入数量 mu sync.Mutex cmd *exec.Cmd cancel context.CancelFunc stdin io.WriteCloser // 自动导入与退出控制 autoImporting bool autoImported bool exitSignaled bool // 403 错误检测 error403Count int // 403 错误计数 error403Start time.Time // 计数开始时间 } // 403 自动停止配置 const ( max403Errors = 10 // 最大 403 错误数 error403Window = 60 * time.Second // 时间窗口(60秒内) ) var teamRegState = &TeamRegState{ Logs: make([]string, 0), } // HandleTeamRegStart POST /api/team-reg/start - 启动注册进程 func HandleTeamRegStart(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } teamRegState.mu.Lock() if teamRegState.Running { teamRegState.mu.Unlock() json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, "message": "已有注册任务在运行中", }) return } var config TeamRegConfig if err := json.NewDecoder(r.Body).Decode(&config); err != nil { teamRegState.mu.Unlock() http.Error(w, "Invalid request body", http.StatusBadRequest) return } // 验证参数 if config.Count < 1 { config.Count = 1 } if config.Count > 100 { config.Count = 100 } if config.Concurrency < 1 { config.Concurrency = 1 } if config.Concurrency > 10 { config.Concurrency = 10 } // 重置状态 teamRegState.Running = true teamRegState.StartedAt = time.Now() teamRegState.Config = config teamRegState.Logs = make([]string, 0) teamRegState.OutputFile = "" teamRegState.Imported = 0 teamRegState.autoImporting = false teamRegState.autoImported = false teamRegState.exitSignaled = false teamRegState.error403Count = 0 teamRegState.error403Start = time.Now() teamRegState.mu.Unlock() // 启动进程 go runTeamRegProcess(config) json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "message": "注册任务已启动", }) } // HandleTeamRegStop POST /api/team-reg/stop - 停止注册进程 func HandleTeamRegStop(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } teamRegState.mu.Lock() if !teamRegState.Running { teamRegState.mu.Unlock() json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, "message": "没有正在运行的任务", }) return } // 发送取消信号 if teamRegState.cancel != nil { teamRegState.cancel() } // 如果进程还在,强制终止 if teamRegState.cmd != nil && teamRegState.cmd.Process != nil { teamRegState.cmd.Process.Kill() } teamRegState.Running = false teamRegState.mu.Unlock() // 在释放锁之后再添加日志 addTeamRegLog("[系统] 任务已被用户停止") json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "message": "任务已停止", }) } // HandleTeamRegStatus GET /api/team-reg/status - 获取状态和日志 func HandleTeamRegStatus(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodGet { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } teamRegState.mu.Lock() state := map[string]interface{}{ "running": teamRegState.Running, "started_at": teamRegState.StartedAt, "config": teamRegState.Config, "logs": teamRegState.Logs, "output_file": teamRegState.OutputFile, "imported": teamRegState.Imported, } teamRegState.mu.Unlock() json.NewEncoder(w).Encode(state) } // HandleTeamRegLogs GET /api/team-reg/logs - SSE 实时日志流 func HandleTeamRegLogs(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("Access-Control-Allow-Origin", "*") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "Streaming not supported", http.StatusInternalServerError) return } lastIndex := 0 ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { select { case <-r.Context().Done(): return case <-ticker.C: teamRegState.mu.Lock() running := teamRegState.Running logs := teamRegState.Logs teamRegState.mu.Unlock() // 发送新日志 if len(logs) > lastIndex { for i := lastIndex; i < len(logs); i++ { fmt.Fprintf(w, "data: %s\n\n", logs[i]) } lastIndex = len(logs) flusher.Flush() } // 发送状态 if !running { fmt.Fprintf(w, "event: done\ndata: finished\n\n") flusher.Flush() return } } } } // HandleTeamRegImport POST /api/team-reg/import - 导入生成的 JSON 到数据库 func HandleTeamRegImport(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } teamRegState.mu.Lock() outputFile := teamRegState.OutputFile teamRegState.mu.Unlock() if outputFile == "" { json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, "message": "没有可导入的文件", }) return } count, err := importAccountsFromJSON(outputFile) if err != nil { json.NewEncoder(w).Encode(map[string]interface{}{ "success": false, "message": fmt.Sprintf("导入失败: %v", err), }) return } teamRegState.mu.Lock() teamRegState.Imported = count teamRegState.mu.Unlock() json.NewEncoder(w).Encode(map[string]interface{}{ "success": true, "message": fmt.Sprintf("成功导入 %d 个账号", count), "count": count, }) } // runTeamRegProcess 执行 team-reg 进程 func runTeamRegProcess(config TeamRegConfig) { defer func() { teamRegState.mu.Lock() teamRegState.Running = false teamRegState.mu.Unlock() }() // 查找 team-reg 可执行文件 execPath := findTeamRegExecutable() if execPath == "" { addTeamRegLog("[错误] 找不到 team-reg 可执行文件") addTeamRegLog("[提示] 请确保 team-reg 文件位于 backend 目录下") return } addTeamRegLog(fmt.Sprintf("[系统] 找到可执行文件: %s", execPath)) // Linux/macOS 上自动设置执行权限 if runtime.GOOS != "windows" { if err := os.Chmod(execPath, 0755); err != nil { addTeamRegLog(fmt.Sprintf("[警告] 设置执行权限失败: %v", err)) } else { addTeamRegLog("[系统] 已设置执行权限 (chmod +x)") } } addTeamRegLog(fmt.Sprintf("[系统] 配置: 数量=%d, 并发=%d, 代理=%s", config.Count, config.Concurrency, config.Proxy)) // 创建上下文用于取消 ctx, cancel := context.WithCancel(context.Background()) teamRegState.mu.Lock() teamRegState.cancel = cancel teamRegState.mu.Unlock() // 创建命令 cmd := exec.CommandContext(ctx, execPath) // 设置工作目录(输出文件会保存在这里) workDir := filepath.Dir(execPath) cmd.Dir = workDir // 获取 stdin, stdout, stderr stdin, err := cmd.StdinPipe() if err != nil { addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stdin: %v", err)) return } stdout, err := cmd.StdoutPipe() if err != nil { addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stdout: %v", err)) return } stderr, err := cmd.StderrPipe() if err != nil { addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stderr: %v", err)) return } teamRegState.mu.Lock() teamRegState.cmd = cmd teamRegState.stdin = stdin teamRegState.mu.Unlock() // 启动进程 addTeamRegLog("[系统] 启动 team-reg 进程...") if err := cmd.Start(); err != nil { addTeamRegLog(fmt.Sprintf("[错误] 启动失败: %v", err)) return } // 合并 stdout 和 stderr 读取 go readOutput(stdout, workDir, config) go readOutput(stderr, workDir, config) // 等待一小段时间让程序启动 time.Sleep(500 * time.Millisecond) // 发送输入参数 addTeamRegLog(fmt.Sprintf("[输入] 注册数量: %d", config.Count)) fmt.Fprintf(stdin, "%d\n", config.Count) time.Sleep(200 * time.Millisecond) addTeamRegLog(fmt.Sprintf("[输入] 并发线程数: %d", config.Concurrency)) fmt.Fprintf(stdin, "%d\n", config.Concurrency) time.Sleep(200 * time.Millisecond) addTeamRegLog(fmt.Sprintf("[输入] 代理地址: %s", config.Proxy)) fmt.Fprintf(stdin, "%s\n", config.Proxy) // 等待进程完成 err = cmd.Wait() if err != nil { if ctx.Err() == context.Canceled { addTeamRegLog("[系统] 进程已被取消") } else { addTeamRegLog(fmt.Sprintf("[系统] 进程退出: %v", err)) } } else { addTeamRegLog("[系统] 进程正常完成") } // 查找输出文件 outputFile := findLatestOutputFile(workDir) if outputFile != "" { teamRegState.mu.Lock() teamRegState.OutputFile = outputFile teamRegState.mu.Unlock() addTeamRegLog(fmt.Sprintf("[系统] 输出文件: %s", filepath.Base(outputFile))) // 自动导入 if config.AutoImport { addTeamRegLog("[系统] 自动导入账号到数据库...") tryAutoImport(outputFile, config) } } // 发送回车退出程序(如果还在运行) time.Sleep(500 * time.Millisecond) signalTeamRegExit() } // readOutput 读取进程输出 func readOutput(reader io.Reader, workDir string, config TeamRegConfig) { scanner := bufio.NewScanner(reader) for scanner.Scan() { line := scanner.Text() // 过滤空行和只有空格的行 trimmed := strings.TrimSpace(line) if trimmed != "" { addTeamRegLog(trimmed) // 检测输出文件名(例如:结果已保存到: accounts-2-20260201-071558.json) if strings.Contains(trimmed, "结果已保存到") || strings.Contains(trimmed, "accounts-") && strings.Contains(trimmed, ".json") { // 尝试提取文件名 if idx := strings.Index(trimmed, "accounts-"); idx >= 0 { endIdx := strings.Index(trimmed[idx:], ".json") if endIdx > 0 { fileName := trimmed[idx : idx+endIdx+5] // 包含 .json // 构建完整路径 fullPath := filepath.Join(workDir, fileName) teamRegState.mu.Lock() teamRegState.OutputFile = fullPath teamRegState.mu.Unlock() if config.AutoImport { go tryAutoImport(fullPath, config) } // 发送回车提示退出(有些程序会在完成后等待回车) signalTeamRegExit() } } } // 检测 403 错误 if strings.Contains(trimmed, "403") { if check403AndStop() { return // 已触发自动停止 } } } } } // check403AndStop 检查 403 错误并决定是否自动停止 // 返回 true 表示已触发停止 func check403AndStop() bool { teamRegState.mu.Lock() defer teamRegState.mu.Unlock() now := time.Now() // 如果超过时间窗口,重置计数 if now.Sub(teamRegState.error403Start) > error403Window { teamRegState.error403Count = 0 teamRegState.error403Start = now } teamRegState.error403Count++ // 如果超过阈值,自动停止 if teamRegState.error403Count >= max403Errors { // 触发停止 if teamRegState.cancel != nil { teamRegState.cancel() } if teamRegState.cmd != nil && teamRegState.cmd.Process != nil { teamRegState.cmd.Process.Kill() } teamRegState.Running = false // 不在锁内调用 addTeamRegLog,先解锁 go func() { addTeamRegLog(fmt.Sprintf("[警告] 检测到 %d 次 403 错误,自动停止注册", max403Errors)) addTeamRegLog("[提示] 可能是代理 IP 被限制,请更换代理后重试") }() return true } return false } // tryAutoImport 尝试自动导入(仅执行一次) func tryAutoImport(filePath string, config TeamRegConfig) { if !config.AutoImport || filePath == "" { return } teamRegState.mu.Lock() if teamRegState.autoImported || teamRegState.autoImporting { teamRegState.mu.Unlock() return } teamRegState.autoImporting = true teamRegState.mu.Unlock() var ( count int err error ) // 等待文件稳定写入(最多重试几次) for i := 0; i < 5; i++ { if _, statErr := os.Stat(filePath); statErr != nil { err = statErr } else { count, err = importAccountsFromJSON(filePath) } if err == nil { break } time.Sleep(300 * time.Millisecond) } teamRegState.mu.Lock() teamRegState.autoImporting = false if err == nil { teamRegState.Imported = count teamRegState.autoImported = true } teamRegState.mu.Unlock() if err != nil { addTeamRegLog(fmt.Sprintf("[错误] 导入失败: %v", err)) return } addTeamRegLog(fmt.Sprintf("[系统] 成功导入 %d 个账号", count)) // 导入成功后删除 JSON 文件 if err := os.Remove(filePath); err != nil { addTeamRegLog(fmt.Sprintf("[警告] 删除临时文件失败: %v", err)) } else { addTeamRegLog(fmt.Sprintf("[系统] 已清理临时文件: %s", filepath.Base(filePath))) } } // signalTeamRegExit 发送回车并关闭 stdin,提示程序退出 func signalTeamRegExit() { teamRegState.mu.Lock() if teamRegState.exitSignaled { teamRegState.mu.Unlock() return } teamRegState.exitSignaled = true stdin := teamRegState.stdin teamRegState.mu.Unlock() if stdin == nil { return } _, _ = fmt.Fprintln(stdin) _ = stdin.Close() } // addTeamRegLog 添加日志 func addTeamRegLog(log string) { teamRegState.mu.Lock() defer teamRegState.mu.Unlock() // 过滤 ANSI 颜色码 cleanLog := ansiRegex.ReplaceAllString(log, "") if cleanLog == "" { return } timestamp := time.Now().Format("15:04:05") fullLog := fmt.Sprintf("[%s] %s", timestamp, cleanLog) teamRegState.Logs = append(teamRegState.Logs, fullLog) // 限制日志数量 if len(teamRegState.Logs) > 1000 { teamRegState.Logs = teamRegState.Logs[len(teamRegState.Logs)-1000:] } // 同时输出到系统日志 logger.Info(fmt.Sprintf("[TeamReg] %s", cleanLog), "", "team-reg") } // findTeamRegExecutable 查找 team-reg 可执行文件 func findTeamRegExecutable() string { // 可能的文件名 var names []string if runtime.GOOS == "windows" { names = []string{"team-reg.exe", "team-reg"} } else { names = []string{"team-reg", "team-reg.exe"} } // 获取当前工作目录 cwd, _ := os.Getwd() // 获取可执行文件的真实路径(解析符号链接) execPath, _ := os.Executable() realExecPath, err := filepath.EvalSymlinks(execPath) if err == nil { execPath = realExecPath } execDir := filepath.Dir(execPath) // 可能的路径(按优先级排序) paths := []string{ execDir, // 可执行文件所在目录(最可靠,team-reg 应与后端在同一目录) cwd, // 当前工作目录 filepath.Join(cwd, "backend"), // cwd/backend filepath.Join(execDir, ".."), // 可执行文件上级目录 filepath.Join(execDir, "backend"), // execDir/backend ".", // 相对当前目录 "backend", // 相对 backend 目录 "..", // 上级目录 filepath.Join("..", "backend"), // ../backend filepath.Join("..", ".."), // 更上级 filepath.Join("..", "..", "backend"), // ../../backend } for _, basePath := range paths { for _, name := range names { fullPath := filepath.Join(basePath, name) if absPath, err := filepath.Abs(fullPath); err == nil { if _, err := os.Stat(absPath); err == nil { return absPath } } } } return "" } // findLatestOutputFile 查找最新的输出文件 func findLatestOutputFile(primaryDir string) string { // 搜索多个可能的目录 cwd, _ := os.Getwd() searchDirs := []string{ primaryDir, cwd, "/app", "/app/data", filepath.Join(cwd, "data"), ".", } var allMatches []string for _, dir := range searchDirs { pattern := filepath.Join(dir, "accounts-*.json") matches, err := filepath.Glob(pattern) if err == nil && len(matches) > 0 { allMatches = append(allMatches, matches...) } } if len(allMatches) == 0 { addTeamRegLog(fmt.Sprintf("[调试] 未找到 accounts-*.json,搜索目录: %v", searchDirs)) return "" } // 去重 uniqueMatches := make(map[string]bool) var finalMatches []string for _, m := range allMatches { absPath, _ := filepath.Abs(m) if !uniqueMatches[absPath] { uniqueMatches[absPath] = true finalMatches = append(finalMatches, absPath) } } // 按修改时间排序,取最新的 sort.Slice(finalMatches, func(i, j int) bool { fi, _ := os.Stat(finalMatches[i]) fj, _ := os.Stat(finalMatches[j]) if fi == nil || fj == nil { return false } return fi.ModTime().After(fj.ModTime()) }) // 确保是最近创建的文件(5分钟内) fi, err := os.Stat(finalMatches[0]) if err != nil { return "" } if time.Since(fi.ModTime()) > 5*time.Minute { addTeamRegLog(fmt.Sprintf("[调试] 找到文件但太旧: %s (修改于 %v 前)", finalMatches[0], time.Since(fi.ModTime()))) return "" } return finalMatches[0] } // TeamRegAccount team-reg 输出的账号格式 type TeamRegAccount struct { Account string `json:"account"` Password string `json:"password"` Token string `json:"token"` AccountID string `json:"account_id"` PlanType string `json:"plan_type"` } // importAccountsFromJSON 从 JSON 文件导入账号 func importAccountsFromJSON(filePath string) (int, error) { if database.Instance == nil { return 0, fmt.Errorf("数据库未初始化") } data, err := os.ReadFile(filePath) if err != nil { return 0, err } var accounts []TeamRegAccount if err := json.Unmarshal(data, &accounts); err != nil { return 0, err } // 转换为 database.TeamOwner 格式 var owners []database.TeamOwner for _, acc := range accounts { if acc.Account == "" || acc.Password == "" { continue } // 提取 account_id(去掉 org- 前缀如果有的话) accountID := acc.AccountID if strings.HasPrefix(accountID, "org-") { accountID = strings.TrimPrefix(accountID, "org-") } owners = append(owners, database.TeamOwner{ Email: acc.Account, Password: acc.Password, Token: acc.Token, AccountID: accountID, }) } // 批量导入 count, err := database.Instance.AddTeamOwners(owners) if err != nil { return 0, err } return count, nil }