Files
codexautopool/backend/internal/api/team_reg_exec.go

756 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"os/exec"
"path/filepath"
"regexp"
"runtime"
"sort"
"strings"
"sync"
"time"
"codex-pool/internal/database"
"codex-pool/internal/logger"
)
// ansiRegex 匹配 ANSI 转义序列(颜色码等)
var ansiRegex = regexp.MustCompile(`\x1b\[[0-9;]*[a-zA-Z]`)
// TeamRegConfig 注册配置
type TeamRegConfig struct {
Count int `json:"count"` // 注册数量
Concurrency int `json:"concurrency"` // 并发线程数
Proxy string `json:"proxy"` // 代理地址
AutoImport bool `json:"auto_import"` // 完成后自动导入
}
// TeamRegState 运行状态
type TeamRegState struct {
Running bool `json:"running"`
StartedAt time.Time `json:"started_at"`
Config TeamRegConfig `json:"config"`
Logs []string `json:"logs"`
OutputFile string `json:"output_file"` // 生成的 JSON 文件
Imported int `json:"imported"` // 已导入数量
mu sync.Mutex
cmd *exec.Cmd
cancel context.CancelFunc
stdin io.WriteCloser
// 自动导入与退出控制
autoImporting bool
autoImported bool
exitSignaled bool
// 403 错误检测
error403Count int // 403 错误计数
error403Start time.Time // 计数开始时间
}
// 403 自动停止配置
const (
max403Errors = 10 // 最大 403 错误数
error403Window = 60 * time.Second // 时间窗口60秒内
)
var teamRegState = &TeamRegState{
Logs: make([]string, 0),
}
// HandleTeamRegStart POST /api/team-reg/start - 启动注册进程
func HandleTeamRegStart(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
teamRegState.mu.Lock()
if teamRegState.Running {
teamRegState.mu.Unlock()
json.NewEncoder(w).Encode(map[string]interface{}{
"success": false,
"message": "已有注册任务在运行中",
})
return
}
var config TeamRegConfig
if err := json.NewDecoder(r.Body).Decode(&config); err != nil {
teamRegState.mu.Unlock()
http.Error(w, "Invalid request body", http.StatusBadRequest)
return
}
// 验证参数
if config.Count < 1 {
config.Count = 1
}
if config.Count > 100 {
config.Count = 100
}
if config.Concurrency < 1 {
config.Concurrency = 1
}
if config.Concurrency > 10 {
config.Concurrency = 10
}
// 重置状态
teamRegState.Running = true
teamRegState.StartedAt = time.Now()
teamRegState.Config = config
teamRegState.Logs = make([]string, 0)
teamRegState.OutputFile = ""
teamRegState.Imported = 0
teamRegState.autoImporting = false
teamRegState.autoImported = false
teamRegState.exitSignaled = false
teamRegState.error403Count = 0
teamRegState.error403Start = time.Now()
teamRegState.mu.Unlock()
// 启动进程
go runTeamRegProcess(config)
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"message": "注册任务已启动",
})
}
// HandleTeamRegStop POST /api/team-reg/stop - 停止注册进程
func HandleTeamRegStop(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
teamRegState.mu.Lock()
if !teamRegState.Running {
teamRegState.mu.Unlock()
json.NewEncoder(w).Encode(map[string]interface{}{
"success": false,
"message": "没有正在运行的任务",
})
return
}
// 发送取消信号
if teamRegState.cancel != nil {
teamRegState.cancel()
}
// 如果进程还在,强制终止
if teamRegState.cmd != nil && teamRegState.cmd.Process != nil {
teamRegState.cmd.Process.Kill()
}
teamRegState.Running = false
teamRegState.mu.Unlock()
// 在释放锁之后再添加日志
addTeamRegLog("[系统] 任务已被用户停止")
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"message": "任务已停止",
})
}
// HandleTeamRegStatus GET /api/team-reg/status - 获取状态和日志
func HandleTeamRegStatus(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
teamRegState.mu.Lock()
state := map[string]interface{}{
"running": teamRegState.Running,
"started_at": teamRegState.StartedAt,
"config": teamRegState.Config,
"logs": teamRegState.Logs,
"output_file": teamRegState.OutputFile,
"imported": teamRegState.Imported,
}
teamRegState.mu.Unlock()
json.NewEncoder(w).Encode(state)
}
// HandleTeamRegLogs GET /api/team-reg/logs - SSE 实时日志流
func HandleTeamRegLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
lastIndex := 0
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-r.Context().Done():
return
case <-ticker.C:
teamRegState.mu.Lock()
running := teamRegState.Running
logs := teamRegState.Logs
teamRegState.mu.Unlock()
// 发送新日志
if len(logs) > lastIndex {
for i := lastIndex; i < len(logs); i++ {
fmt.Fprintf(w, "data: %s\n\n", logs[i])
}
lastIndex = len(logs)
flusher.Flush()
}
// 发送状态
if !running {
fmt.Fprintf(w, "event: done\ndata: finished\n\n")
flusher.Flush()
return
}
}
}
}
// HandleTeamRegImport POST /api/team-reg/import - 导入生成的 JSON 到数据库
func HandleTeamRegImport(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
teamRegState.mu.Lock()
outputFile := teamRegState.OutputFile
teamRegState.mu.Unlock()
if outputFile == "" {
json.NewEncoder(w).Encode(map[string]interface{}{
"success": false,
"message": "没有可导入的文件",
})
return
}
count, err := importAccountsFromJSON(outputFile)
if err != nil {
json.NewEncoder(w).Encode(map[string]interface{}{
"success": false,
"message": fmt.Sprintf("导入失败: %v", err),
})
return
}
teamRegState.mu.Lock()
teamRegState.Imported = count
teamRegState.mu.Unlock()
json.NewEncoder(w).Encode(map[string]interface{}{
"success": true,
"message": fmt.Sprintf("成功导入 %d 个账号", count),
"count": count,
})
}
// runTeamRegProcess 执行 team-reg 进程
func runTeamRegProcess(config TeamRegConfig) {
defer func() {
teamRegState.mu.Lock()
teamRegState.Running = false
teamRegState.mu.Unlock()
}()
// 查找 team-reg 可执行文件
execPath := findTeamRegExecutable()
if execPath == "" {
addTeamRegLog("[错误] 找不到 team-reg 可执行文件")
addTeamRegLog("[提示] 请确保 team-reg 文件位于 backend 目录下")
return
}
addTeamRegLog(fmt.Sprintf("[系统] 找到可执行文件: %s", execPath))
// Linux/macOS 上自动设置执行权限
if runtime.GOOS != "windows" {
if err := os.Chmod(execPath, 0755); err != nil {
addTeamRegLog(fmt.Sprintf("[警告] 设置执行权限失败: %v", err))
} else {
addTeamRegLog("[系统] 已设置执行权限 (chmod +x)")
}
}
addTeamRegLog(fmt.Sprintf("[系统] 配置: 数量=%d, 并发=%d, 代理=%s",
config.Count, config.Concurrency, config.Proxy))
// 创建上下文用于取消
ctx, cancel := context.WithCancel(context.Background())
teamRegState.mu.Lock()
teamRegState.cancel = cancel
teamRegState.mu.Unlock()
// 创建命令
cmd := exec.CommandContext(ctx, execPath)
// 设置工作目录(输出文件会保存在这里)
workDir := filepath.Dir(execPath)
cmd.Dir = workDir
// 获取 stdin, stdout, stderr
stdin, err := cmd.StdinPipe()
if err != nil {
addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stdin: %v", err))
return
}
stdout, err := cmd.StdoutPipe()
if err != nil {
addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stdout: %v", err))
return
}
stderr, err := cmd.StderrPipe()
if err != nil {
addTeamRegLog(fmt.Sprintf("[错误] 无法获取 stderr: %v", err))
return
}
teamRegState.mu.Lock()
teamRegState.cmd = cmd
teamRegState.stdin = stdin
teamRegState.mu.Unlock()
// 启动进程
addTeamRegLog("[系统] 启动 team-reg 进程...")
if err := cmd.Start(); err != nil {
addTeamRegLog(fmt.Sprintf("[错误] 启动失败: %v", err))
return
}
// 合并 stdout 和 stderr 读取
go readOutput(stdout, workDir, config)
go readOutput(stderr, workDir, config)
// 等待一小段时间让程序启动
time.Sleep(500 * time.Millisecond)
// 发送输入参数
addTeamRegLog(fmt.Sprintf("[输入] 注册数量: %d", config.Count))
fmt.Fprintf(stdin, "%d\n", config.Count)
time.Sleep(200 * time.Millisecond)
addTeamRegLog(fmt.Sprintf("[输入] 并发线程数: %d", config.Concurrency))
fmt.Fprintf(stdin, "%d\n", config.Concurrency)
time.Sleep(200 * time.Millisecond)
addTeamRegLog(fmt.Sprintf("[输入] 代理地址: %s", config.Proxy))
fmt.Fprintf(stdin, "%s\n", config.Proxy)
// 等待进程完成
err = cmd.Wait()
if err != nil {
if ctx.Err() == context.Canceled {
addTeamRegLog("[系统] 进程已被取消")
} else {
addTeamRegLog(fmt.Sprintf("[系统] 进程退出: %v", err))
}
} else {
addTeamRegLog("[系统] 进程正常完成")
}
// 查找输出文件
outputFile := findLatestOutputFile(workDir)
if outputFile != "" {
teamRegState.mu.Lock()
teamRegState.OutputFile = outputFile
teamRegState.mu.Unlock()
addTeamRegLog(fmt.Sprintf("[系统] 输出文件: %s", filepath.Base(outputFile)))
// 自动导入
if config.AutoImport {
addTeamRegLog("[系统] 自动导入账号到数据库...")
tryAutoImport(outputFile, config)
}
}
// 发送回车退出程序(如果还在运行)
time.Sleep(500 * time.Millisecond)
signalTeamRegExit()
}
// readOutput 读取进程输出
func readOutput(reader io.Reader, workDir string, config TeamRegConfig) {
scanner := bufio.NewScanner(reader)
for scanner.Scan() {
line := scanner.Text()
// 过滤空行和只有空格的行
trimmed := strings.TrimSpace(line)
if trimmed != "" {
addTeamRegLog(trimmed)
// 检测输出文件名(例如:结果已保存到: accounts-2-20260201-071558.json
if strings.Contains(trimmed, "结果已保存到") || strings.Contains(trimmed, "accounts-") && strings.Contains(trimmed, ".json") {
// 尝试提取文件名
if idx := strings.Index(trimmed, "accounts-"); idx >= 0 {
endIdx := strings.Index(trimmed[idx:], ".json")
if endIdx > 0 {
fileName := trimmed[idx : idx+endIdx+5] // 包含 .json
// 构建完整路径
fullPath := filepath.Join(workDir, fileName)
teamRegState.mu.Lock()
teamRegState.OutputFile = fullPath
teamRegState.mu.Unlock()
if config.AutoImport {
go tryAutoImport(fullPath, config)
}
// 发送回车提示退出(有些程序会在完成后等待回车)
signalTeamRegExit()
}
}
}
// 检测 403 错误
if strings.Contains(trimmed, "403") {
if check403AndStop() {
return // 已触发自动停止
}
}
}
}
}
// check403AndStop 检查 403 错误并决定是否自动停止
// 返回 true 表示已触发停止
func check403AndStop() bool {
teamRegState.mu.Lock()
defer teamRegState.mu.Unlock()
now := time.Now()
// 如果超过时间窗口,重置计数
if now.Sub(teamRegState.error403Start) > error403Window {
teamRegState.error403Count = 0
teamRegState.error403Start = now
}
teamRegState.error403Count++
// 如果超过阈值,自动停止
if teamRegState.error403Count >= max403Errors {
// 触发停止
if teamRegState.cancel != nil {
teamRegState.cancel()
}
if teamRegState.cmd != nil && teamRegState.cmd.Process != nil {
teamRegState.cmd.Process.Kill()
}
teamRegState.Running = false
// 不在锁内调用 addTeamRegLog先解锁
go func() {
addTeamRegLog(fmt.Sprintf("[警告] 检测到 %d 次 403 错误,自动停止注册", max403Errors))
addTeamRegLog("[提示] 可能是代理 IP 被限制,请更换代理后重试")
}()
return true
}
return false
}
// tryAutoImport 尝试自动导入(仅执行一次)
func tryAutoImport(filePath string, config TeamRegConfig) {
if !config.AutoImport || filePath == "" {
return
}
teamRegState.mu.Lock()
if teamRegState.autoImported || teamRegState.autoImporting {
teamRegState.mu.Unlock()
return
}
teamRegState.autoImporting = true
teamRegState.mu.Unlock()
var (
count int
err error
)
// 等待文件稳定写入(最多重试几次)
for i := 0; i < 5; i++ {
if _, statErr := os.Stat(filePath); statErr != nil {
err = statErr
} else {
count, err = importAccountsFromJSON(filePath)
}
if err == nil {
break
}
time.Sleep(300 * time.Millisecond)
}
teamRegState.mu.Lock()
teamRegState.autoImporting = false
if err == nil {
teamRegState.Imported = count
teamRegState.autoImported = true
}
teamRegState.mu.Unlock()
if err != nil {
addTeamRegLog(fmt.Sprintf("[错误] 导入失败: %v", err))
return
}
addTeamRegLog(fmt.Sprintf("[系统] 成功导入 %d 个账号", count))
// 导入成功后删除 JSON 文件
if err := os.Remove(filePath); err != nil {
addTeamRegLog(fmt.Sprintf("[警告] 删除临时文件失败: %v", err))
} else {
addTeamRegLog(fmt.Sprintf("[系统] 已清理临时文件: %s", filepath.Base(filePath)))
}
}
// signalTeamRegExit 发送回车并关闭 stdin提示程序退出
func signalTeamRegExit() {
teamRegState.mu.Lock()
if teamRegState.exitSignaled {
teamRegState.mu.Unlock()
return
}
teamRegState.exitSignaled = true
stdin := teamRegState.stdin
teamRegState.mu.Unlock()
if stdin == nil {
return
}
_, _ = fmt.Fprintln(stdin)
_ = stdin.Close()
}
// addTeamRegLog 添加日志
func addTeamRegLog(log string) {
teamRegState.mu.Lock()
defer teamRegState.mu.Unlock()
// 过滤 ANSI 颜色码
cleanLog := ansiRegex.ReplaceAllString(log, "")
if cleanLog == "" {
return
}
timestamp := time.Now().Format("15:04:05")
fullLog := fmt.Sprintf("[%s] %s", timestamp, cleanLog)
teamRegState.Logs = append(teamRegState.Logs, fullLog)
// 限制日志数量
if len(teamRegState.Logs) > 1000 {
teamRegState.Logs = teamRegState.Logs[len(teamRegState.Logs)-1000:]
}
// 同时输出到系统日志
logger.Info(fmt.Sprintf("[TeamReg] %s", cleanLog), "", "team-reg")
}
// HandleTeamRegClearLogs POST /api/team-reg/clear-logs - 清除日志
func HandleTeamRegClearLogs(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
Error(w, http.StatusMethodNotAllowed, "仅支持 POST")
return
}
teamRegState.mu.Lock()
teamRegState.Logs = make([]string, 0)
teamRegState.mu.Unlock()
logger.Info("[TeamReg] 日志已清除", "", "team-reg")
Success(w, map[string]string{"message": "日志已清除"})
}
// findTeamRegExecutable 查找 team-reg 可执行文件
func findTeamRegExecutable() string {
// 可能的文件名
var names []string
if runtime.GOOS == "windows" {
names = []string{"team-reg.exe", "team-reg"}
} else {
names = []string{"team-reg", "team-reg.exe"}
}
// 获取当前工作目录
cwd, _ := os.Getwd()
// 获取可执行文件的真实路径(解析符号链接)
execPath, _ := os.Executable()
realExecPath, err := filepath.EvalSymlinks(execPath)
if err == nil {
execPath = realExecPath
}
execDir := filepath.Dir(execPath)
// 可能的路径(按优先级排序)
paths := []string{
execDir, // 可执行文件所在目录最可靠team-reg 应与后端在同一目录)
cwd, // 当前工作目录
filepath.Join(cwd, "backend"), // cwd/backend
filepath.Join(execDir, ".."), // 可执行文件上级目录
filepath.Join(execDir, "backend"), // execDir/backend
".", // 相对当前目录
"backend", // 相对 backend 目录
"..", // 上级目录
filepath.Join("..", "backend"), // ../backend
filepath.Join("..", ".."), // 更上级
filepath.Join("..", "..", "backend"), // ../../backend
}
for _, basePath := range paths {
for _, name := range names {
fullPath := filepath.Join(basePath, name)
if absPath, err := filepath.Abs(fullPath); err == nil {
if _, err := os.Stat(absPath); err == nil {
return absPath
}
}
}
}
return ""
}
// findLatestOutputFile 查找最新的输出文件
func findLatestOutputFile(primaryDir string) string {
// 搜索多个可能的目录
cwd, _ := os.Getwd()
searchDirs := []string{
primaryDir,
cwd,
"/app",
"/app/data",
filepath.Join(cwd, "data"),
".",
}
var allMatches []string
for _, dir := range searchDirs {
pattern := filepath.Join(dir, "accounts-*.json")
matches, err := filepath.Glob(pattern)
if err == nil && len(matches) > 0 {
allMatches = append(allMatches, matches...)
}
}
if len(allMatches) == 0 {
addTeamRegLog(fmt.Sprintf("[调试] 未找到 accounts-*.json搜索目录: %v", searchDirs))
return ""
}
// 去重
uniqueMatches := make(map[string]bool)
var finalMatches []string
for _, m := range allMatches {
absPath, _ := filepath.Abs(m)
if !uniqueMatches[absPath] {
uniqueMatches[absPath] = true
finalMatches = append(finalMatches, absPath)
}
}
// 按修改时间排序,取最新的
sort.Slice(finalMatches, func(i, j int) bool {
fi, _ := os.Stat(finalMatches[i])
fj, _ := os.Stat(finalMatches[j])
if fi == nil || fj == nil {
return false
}
return fi.ModTime().After(fj.ModTime())
})
// 确保是最近创建的文件5分钟内
fi, err := os.Stat(finalMatches[0])
if err != nil {
return ""
}
if time.Since(fi.ModTime()) > 5*time.Minute {
addTeamRegLog(fmt.Sprintf("[调试] 找到文件但太旧: %s (修改于 %v 前)", finalMatches[0], time.Since(fi.ModTime())))
return ""
}
return finalMatches[0]
}
// TeamRegAccount team-reg 输出的账号格式
type TeamRegAccount struct {
Account string `json:"account"`
Password string `json:"password"`
Token string `json:"token"`
AccountID string `json:"account_id"`
PlanType string `json:"plan_type"`
}
// importAccountsFromJSON 从 JSON 文件导入账号
func importAccountsFromJSON(filePath string) (int, error) {
if database.Instance == nil {
return 0, fmt.Errorf("数据库未初始化")
}
data, err := os.ReadFile(filePath)
if err != nil {
return 0, err
}
var accounts []TeamRegAccount
if err := json.Unmarshal(data, &accounts); err != nil {
return 0, err
}
// 转换为 database.TeamOwner 格式
var owners []database.TeamOwner
for _, acc := range accounts {
if acc.Account == "" || acc.Password == "" {
continue
}
// 提取 account_id去掉 org- 前缀如果有的话)
accountID := acc.AccountID
if strings.HasPrefix(accountID, "org-") {
accountID = strings.TrimPrefix(accountID, "org-")
}
owners = append(owners, database.TeamOwner{
Email: acc.Account,
Password: acc.Password,
Token: acc.Token,
AccountID: accountID,
})
}
// 批量导入
count, err := database.Instance.AddTeamOwners(owners)
if err != nil {
return 0, err
}
return count, nil
}