From 165c6d69b9678ae6e3c857345941171a582df245 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 30 Jan 2026 18:15:50 +0800 Subject: [PATCH] feat: Implement core backend services for team owner management, SQLite persistence, and logging, alongside frontend monitoring and record views. --- backend/cmd/main.go | 83 ++++++++ backend/internal/api/team_process.go | 221 ++++++++++++------- backend/internal/database/sqlite.go | 165 +++++++++++++- backend/internal/logger/logger.go | 45 +++- frontend/src/components/LiveLogViewer.tsx | 204 ++++++++++++++++++ frontend/src/pages/Monitor.tsx | 3 + frontend/src/pages/Records.tsx | 248 ++++++++++++++++++---- 7 files changed, 844 insertions(+), 125 deletions(-) create mode 100644 frontend/src/components/LiveLogViewer.tsx diff --git a/backend/cmd/main.go b/backend/cmd/main.go index 17e7af1..2ac3e41 100644 --- a/backend/cmd/main.go +++ b/backend/cmd/main.go @@ -92,6 +92,7 @@ func startServer(cfg *config.Config) { // 日志 API mux.HandleFunc("/api/logs", api.CORS(handleGetLogs)) mux.HandleFunc("/api/logs/clear", api.CORS(handleClearLogs)) + mux.HandleFunc("/api/logs/stream", handleLogStream) // SSE 实时日志 // S2A 代理 API mux.HandleFunc("/api/s2a/test", api.CORS(handleS2ATest)) @@ -117,6 +118,10 @@ func startServer(cfg *config.Config) { mux.HandleFunc("/api/team/status", api.CORS(api.HandleTeamProcessStatus)) mux.HandleFunc("/api/team/stop", api.CORS(api.HandleTeamProcessStop)) + // 批次记录 API + mux.HandleFunc("/api/batch/runs", api.CORS(handleBatchRuns)) + mux.HandleFunc("/api/batch/stats", api.CORS(handleBatchStats)) + // 监控设置 API mux.HandleFunc("/api/monitor/settings", api.CORS(api.HandleGetMonitorSettings)) mux.HandleFunc("/api/monitor/settings/save", api.CORS(api.HandleSaveMonitorSettings)) @@ -252,6 +257,84 @@ func handleClearLogs(w http.ResponseWriter, r *http.Request) { api.Success(w, map[string]string{"message": "日志已清空"}) } +// handleBatchRuns 获取批次运行记录 +func handleBatchRuns(w http.ResponseWriter, r *http.Request) { + if database.Instance == nil { + api.Error(w, http.StatusInternalServerError, "数据库未初始化") + return + } + + runs, err := database.Instance.GetBatchRuns(50) + if err != nil { + api.Error(w, http.StatusInternalServerError, fmt.Sprintf("获取记录失败: %v", err)) + return + } + + api.Success(w, runs) +} + +// handleBatchStats 获取批次统计 +func handleBatchStats(w http.ResponseWriter, r *http.Request) { + if database.Instance == nil { + api.Error(w, http.StatusInternalServerError, "数据库未初始化") + return + } + + stats := database.Instance.GetBatchRunStats() + api.Success(w, stats) +} + +// handleLogStream SSE 实时日志流 +func handleLogStream(w http.ResponseWriter, r *http.Request) { + // 设置 SSE 响应头 + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("Access-Control-Allow-Origin", "*") + + // 生成唯一 ID + listenerID := fmt.Sprintf("client-%d", time.Now().UnixNano()) + + // 订阅日志 + logChan := logger.AddListener(listenerID) + defer logger.RemoveListener(listenerID) + + // 获取 flusher + flusher, ok := w.(http.Flusher) + if !ok { + http.Error(w, "SSE not supported", http.StatusInternalServerError) + return + } + + // 发送初始连接确认 + fmt.Fprintf(w, "data: {\"type\":\"connected\",\"id\":\"%s\"}\n\n", listenerID) + flusher.Flush() + + // 监听日志和连接关闭 + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case entry, ok := <-logChan: + if !ok { + return + } + // 发送日志条目 + data, _ := json.Marshal(map[string]interface{}{ + "type": "log", + "timestamp": entry.Timestamp.Format("15:04:05"), + "level": entry.Level, + "message": entry.Message, + "email": entry.Email, + "module": entry.Module, + }) + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() + } + } +} + func handleS2ATest(w http.ResponseWriter, r *http.Request) { if config.Global == nil || config.Global.S2AApiBase == "" { api.Error(w, http.StatusBadRequest, "S2A 配置未设置") diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index cc02352..96289c1 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "net/http" - "strings" "sync" "sync/atomic" "time" @@ -187,7 +186,17 @@ func runTeamProcess(req TeamProcessRequest) { workerCount = 2 // 默认 2 个并发 } - logger.Info(fmt.Sprintf("Starting Team process: %d owners, %d concurrent workers", totalOwners, workerCount), "", "team") + // 创建批次记录 + var batchID int64 + if database.Instance != nil { + var err error + batchID, err = database.Instance.CreateBatchRun(totalOwners) + if err != nil { + logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team") + } + } + + logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team") // 任务队列 taskChan := make(chan int, totalOwners) @@ -228,13 +237,42 @@ func runTeamProcess(req TeamProcessRequest) { close(resultChan) }() + // 统计总数 + var totalRegistered, totalAddedToS2A int + var allErrors []string + for result := range resultChan { teamProcessState.mu.Lock() teamProcessState.Results = append(teamProcessState.Results, result) teamProcessState.mu.Unlock() + + totalRegistered += result.Registered + totalAddedToS2A += result.AddedToS2A + allErrors = append(allErrors, result.Errors...) } - logger.Success(fmt.Sprintf("Team process complete: %d/%d teams processed", teamProcessState.Completed, totalOwners), "", "team") + // 更新批次记录 + if database.Instance != nil && batchID > 0 { + errorsStr := "" + if len(allErrors) > 0 { + // 只保留前10条错误 + if len(allErrors) > 10 { + allErrors = allErrors[:10] + } + errorsStr = fmt.Sprintf("%v", allErrors) + } + database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr) + } + + // 计算成功率 + expectedTotal := totalOwners * req.MembersPerTeam + successRate := float64(0) + if expectedTotal > 0 { + successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100 + } + + logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%", + teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team") } // processSingleTeam 处理单个 Team @@ -249,7 +287,24 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } logPrefix := fmt.Sprintf("[Team %d]", idx+1) - logger.Info(fmt.Sprintf("%s Starting with owner: %s", logPrefix, owner.Email), owner.Email, "team") + logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team") + + // 标记 owner 为处理中 + if database.Instance != nil { + database.Instance.MarkOwnerAsProcessing(owner.Email) + } + + // 处理失败时的清理函数 + markOwnerResult := func(success bool) { + if database.Instance != nil { + if success { + database.Instance.MarkOwnerAsUsed(owner.Email) + } else { + // 失败时恢复为 valid,允许重试 + database.Instance.MarkOwnerAsFailed(owner.Email) + } + } + } // Step 1: 获取 Team ID(优先使用已存储的 account_id) var teamID string @@ -268,92 +323,102 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team") + markOwnerResult(false) return result } logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team") } result.TeamID = teamID - // Step 2: 生成成员邮箱并发送邀请 + // Step 2: 并发注册成员 + // 每个成员:邀请 → 注册,失败重试1次 + // Team 有4次额外补救机会 type MemberAccount struct { Email string Password string Success bool } children := make([]MemberAccount, req.MembersPerTeam) - for i := 0; i < req.MembersPerTeam; i++ { - children[i].Email = mail.GenerateEmail() - children[i].Password = register.GeneratePassword() - logger.Info(fmt.Sprintf("%s [Member %d] Email: %s", logPrefix, i+1, children[i].Email), children[i].Email, "team") - } - - // 发送邀请 - inviteEmails := make([]string, req.MembersPerTeam) - for i, c := range children { - inviteEmails[i] = c.Email - } - if err := inviter.SendInvites(inviteEmails); err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("发送邀请失败: %v", err)) - result.DurationMs = time.Since(startTime).Milliseconds() - return result - } - logger.Success(fmt.Sprintf("%s Sent %d invite(s)", logPrefix, len(inviteEmails)), owner.Email, "team") - - // Step 3: 并发注册成员 + var memberMu sync.Mutex var memberWg sync.WaitGroup - memberMutex := sync.Mutex{} - for i := range children { - memberWg.Add(1) - go func(memberIdx int) { - defer memberWg.Done() + // 注册单个成员的函数(带1次重试) + registerMember := func(memberIdx int, email, password string) bool { + name := register.GenerateName() + birthdate := register.GenerateBirthdate() - memberMutex.Lock() - email := children[memberIdx].Email - password := children[memberIdx].Password - memberMutex.Unlock() - - name := register.GenerateName() - birthdate := register.GenerateBirthdate() - - // 重试逻辑 - for attempt := 0; attempt < 3; attempt++ { - if !teamProcessState.Running { - return - } - - if attempt > 0 { - email = mail.GenerateEmail() - password = register.GeneratePassword() - logger.Info(fmt.Sprintf("%s [Member %d] Retry %d: %s", logPrefix, memberIdx+1, attempt, email), email, "team") - - if err := inviter.SendInvites([]string{email}); err != nil { - continue - } - } - - _, err := registerWithTimeout(email, password, name, birthdate, req.Proxy) - if err != nil { - if strings.Contains(err.Error(), "验证码") { - continue - } - result.Errors = append(result.Errors, fmt.Sprintf("Member %d: %v", memberIdx+1, err)) - return - } - - memberMutex.Lock() - children[memberIdx].Email = email - children[memberIdx].Password = password - children[memberIdx].Success = true - memberMutex.Unlock() - - logger.Success(fmt.Sprintf("%s [Member %d] Registered", logPrefix, memberIdx+1), email, "team") - return + for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试) + if !teamProcessState.Running { + return false } + + currentEmail := email + currentPassword := password + if attempt > 0 { + // 重试时使用新邮箱 + currentEmail = mail.GenerateEmail() + currentPassword = register.GeneratePassword() + logger.Warning(fmt.Sprintf("%s [成员 %d] 重试,新邮箱: %s", logPrefix, memberIdx+1, currentEmail), currentEmail, "team") + } + + // 发送邀请 + if err := inviter.SendInvites([]string{currentEmail}); err != nil { + logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team") + continue + } + + // 注册 + _, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy) + if err != nil { + logger.Error(fmt.Sprintf("%s [成员 %d] 注册失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team") + continue + } + + // 成功 + memberMu.Lock() + children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true} + memberMu.Unlock() + logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 注册成功", logPrefix, memberIdx+1), currentEmail, "team") + return true + } + return false + } + + // 第一轮:并发注册4个成员 + logger.Info(fmt.Sprintf("%s 开始并发注册 %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team") + for i := 0; i < req.MembersPerTeam; i++ { + memberWg.Add(1) + go func(idx int) { + defer memberWg.Done() + email := mail.GenerateEmail() + password := register.GeneratePassword() + logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team") + registerMember(idx, email, password) }(i) } memberWg.Wait() + // 统计失败的成员 + failedSlots := make([]int, 0) + for i, c := range children { + if !c.Success { + failedSlots = append(failedSlots, i) + } + } + + // 第二轮:Team 有 4 次额外补救机会 + teamRetries := 4 + for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ { + slotIdx := failedSlots[0] + logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") + + email := mail.GenerateEmail() + password := register.GeneratePassword() + if registerMember(slotIdx, email, password) { + failedSlots = failedSlots[1:] // 成功,移除这个槽位 + } + } + // 统计注册成功数 registeredChildren := make([]MemberAccount, 0) for _, c := range children { @@ -363,7 +428,11 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { result.Registered++ } } - logger.Info(fmt.Sprintf("%s Registered: %d/%d", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team") + + if len(failedSlots) > 0 { + result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots))) + } + logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team") // Step 4: S2A 授权入库(成员) for i, child := range registeredChildren { @@ -407,7 +476,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } result.AddedToS2A++ - logger.Success(fmt.Sprintf("%s [Member %d] Added to S2A", logPrefix, i+1), child.Email, "team") + logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team") } // Step 5: 母号也入库(如果开启) @@ -442,14 +511,18 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err)) } else { result.AddedToS2A++ - logger.Success(fmt.Sprintf("%s [Owner] Added to S2A", logPrefix), owner.Email, "team") + logger.Success(fmt.Sprintf("%s [母号] ✓ 入库成功", logPrefix), owner.Email, "team") } } } } result.DurationMs = time.Since(startTime).Milliseconds() - logger.Success(fmt.Sprintf("%s Complete: %d registered, %d in S2A", logPrefix, result.Registered, result.AddedToS2A), owner.Email, "team") + logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team") + + // 根据入库结果标记 owner 状态 + // 只要有任何一个账号入库成功,就标记为已使用 + markOwnerResult(result.AddedToS2A > 0) return result } diff --git a/backend/internal/database/sqlite.go b/backend/internal/database/sqlite.go index 76a60c3..3334c7b 100644 --- a/backend/internal/database/sqlite.go +++ b/backend/internal/database/sqlite.go @@ -67,6 +67,22 @@ func (d *DB) createTables() error { value TEXT NOT NULL, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); + + -- 批次运行记录表 + CREATE TABLE IF NOT EXISTS batch_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + started_at DATETIME NOT NULL, + finished_at DATETIME, + total_owners INTEGER DEFAULT 0, + total_registered INTEGER DEFAULT 0, + total_added_to_s2a INTEGER DEFAULT 0, + success_rate REAL DEFAULT 0, + duration_seconds INTEGER DEFAULT 0, + status TEXT DEFAULT 'running', + errors TEXT + ); + + CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at); `) return err } @@ -195,7 +211,7 @@ func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int, return owners, total, nil } -// GetPendingOwners 获取待处理 +// GetPendingOwners 获取待处理(排除已使用和处理中的) func (d *DB) GetPendingOwners() ([]TeamOwner, error) { rows, err := d.db.Query(` SELECT id, email, password, token, account_id, status, created_at @@ -219,6 +235,24 @@ func (d *DB) GetPendingOwners() ([]TeamOwner, error) { return owners, nil } +// MarkOwnerAsUsed 标记 Owner 为已使用 +func (d *DB) MarkOwnerAsUsed(email string) error { + _, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email) + return err +} + +// MarkOwnerAsProcessing 标记 Owner 为处理中 +func (d *DB) MarkOwnerAsProcessing(email string) error { + _, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email) + return err +} + +// MarkOwnerAsFailed 标记 Owner 为失败(可重试) +func (d *DB) MarkOwnerAsFailed(email string) error { + _, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email) + return err +} + // UpdateOwnerStatus 更新状态 func (d *DB) UpdateOwnerStatus(id int64, status string) error { _, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id) @@ -293,6 +327,135 @@ func (d *DB) GetOwnerStats() map[string]int { return stats } +// BatchRun 批次运行记录 +type BatchRun struct { + ID int64 `json:"id"` + StartedAt time.Time `json:"started_at"` + FinishedAt time.Time `json:"finished_at,omitempty"` + TotalOwners int `json:"total_owners"` + TotalRegistered int `json:"total_registered"` + TotalAddedToS2A int `json:"total_added_to_s2a"` + SuccessRate float64 `json:"success_rate"` + DurationSeconds int `json:"duration_seconds"` + Status string `json:"status"` + Errors string `json:"errors,omitempty"` +} + +// CreateBatchRun 创建批次记录 +func (d *DB) CreateBatchRun(totalOwners int) (int64, error) { + result, err := d.db.Exec( + `INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`, + time.Now(), totalOwners, + ) + if err != nil { + return 0, err + } + return result.LastInsertId() +} + +// UpdateBatchRun 更新批次记录 +func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error { + finishedAt := time.Now() + + // 获取开始时间计算耗时 + var startedAt time.Time + var totalOwners int + d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners) + + duration := int(finishedAt.Sub(startedAt).Seconds()) + + // 计算成功率(以入库数为准) + var successRate float64 + expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员 + if expectedTotal > 0 { + successRate = float64(addedToS2A) / float64(expectedTotal) * 100 + } + + _, err := d.db.Exec( + `UPDATE batch_runs SET + finished_at = ?, + total_registered = ?, + total_added_to_s2a = ?, + success_rate = ?, + duration_seconds = ?, + status = 'completed', + errors = ? + WHERE id = ?`, + finishedAt, registered, addedToS2A, successRate, duration, errors, id, + ) + return err +} + +// GetBatchRuns 获取批次记录列表 +func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) { + if limit <= 0 { + limit = 50 + } + + rows, err := d.db.Query(` + SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a, + success_rate, duration_seconds, status, COALESCE(errors, '') + FROM batch_runs + ORDER BY started_at DESC + LIMIT ? + `, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var runs []BatchRun + for rows.Next() { + var run BatchRun + var finishedAt sql.NullTime + err := rows.Scan( + &run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered, + &run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors, + ) + if err != nil { + continue + } + if finishedAt.Valid { + run.FinishedAt = finishedAt.Time + } + runs = append(runs, run) + } + return runs, nil +} + +// GetBatchRunStats 获取批次统计 +func (d *DB) GetBatchRunStats() map[string]interface{} { + stats := make(map[string]interface{}) + + var totalAdded, todayAdded int + var avgRate float64 + + // 总入库数 + d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded) + stats["total_added"] = totalAdded + + // 今日入库数 + d.db.QueryRow(` + SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs + WHERE DATE(started_at) = DATE('now', 'localtime') + `).Scan(&todayAdded) + stats["today_added"] = todayAdded + + // 平均成功率 + d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate) + stats["avg_success_rate"] = avgRate + + // 本周入库数 + var weekAdded int + d.db.QueryRow(` + SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs + WHERE started_at >= datetime('now', '-7 days') + `).Scan(&weekAdded) + stats["week_added"] = weekAdded + + return stats +} + // Close 关闭数据库 func (d *DB) Close() error { if d.db != nil { diff --git a/backend/internal/logger/logger.go b/backend/internal/logger/logger.go index ce141da..f0336c5 100644 --- a/backend/internal/logger/logger.go +++ b/backend/internal/logger/logger.go @@ -2,6 +2,8 @@ package logger import ( "fmt" + "strconv" + "strings" "sync" "time" ) @@ -84,6 +86,35 @@ func log(level, message, email, module string) { colorYellow := "\033[33m" colorCyan := "\033[36m" + // Team 颜色列表(用于区分不同 Team) + teamColors := []string{ + "\033[38;5;39m", // 亮蓝 + "\033[38;5;208m", // 橙色 + "\033[38;5;141m", // 紫色 + "\033[38;5;48m", // 青绿 + "\033[38;5;197m", // 粉红 + "\033[38;5;226m", // 亮黄 + "\033[38;5;87m", // 青色 + "\033[38;5;156m", // 浅绿 + "\033[38;5;219m", // 浅粉 + "\033[38;5;117m", // 天蓝 + } + + // 从消息中提取 Team 编号 + teamColor := "" + if strings.Contains(message, "[Team ") { + start := strings.Index(message, "[Team ") + if start >= 0 { + end := strings.Index(message[start:], "]") + if end > 0 { + teamStr := message[start+6 : start+end] + if teamNum, err := strconv.Atoi(teamStr); err == nil && teamNum > 0 { + teamColor = teamColors[(teamNum-1)%len(teamColors)] + } + } + } + } + prefix := "" color := "" switch level { @@ -101,16 +132,22 @@ func log(level, message, email, module string) { color = colorYellow } + // 如果是 Team 相关日志,消息使用 Team 颜色 + msgColor := colorReset + if teamColor != "" { + msgColor = teamColor + } + if email != "" { - fmt.Printf("%s%s%s %s[%s]%s [%s] %s - %s\n", + fmt.Printf("%s%s%s %s[%s]%s [%s] %s - %s%s%s\n", colorGray, timestamp, colorReset, color, prefix, colorReset, - module, email, message) + module, email, msgColor, message, colorReset) } else { - fmt.Printf("%s%s%s %s[%s]%s [%s] %s\n", + fmt.Printf("%s%s%s %s[%s]%s [%s] %s%s%s\n", colorGray, timestamp, colorReset, color, prefix, colorReset, - module, message) + module, msgColor, message, colorReset) } } diff --git a/frontend/src/components/LiveLogViewer.tsx b/frontend/src/components/LiveLogViewer.tsx new file mode 100644 index 0000000..f999d58 --- /dev/null +++ b/frontend/src/components/LiveLogViewer.tsx @@ -0,0 +1,204 @@ +import { useState, useEffect, useRef, useCallback } from 'react' +import { Terminal, Play, Pause, Trash2, ChevronDown } from 'lucide-react' + +interface LogEntry { + type: string + timestamp: string + level: string + message: string + email?: string + module?: string +} + +const levelColors: Record = { + success: 'text-green-400', + error: 'text-red-400', + warning: 'text-yellow-400', + info: 'text-cyan-400', +} + +const levelBgColors: Record = { + success: 'bg-green-900/20', + error: 'bg-red-900/20', + warning: 'bg-yellow-900/20', + info: 'bg-cyan-900/20', +} + +interface LiveLogViewerProps { + maxLogs?: number + autoScroll?: boolean + className?: string +} + +export default function LiveLogViewer({ + maxLogs = 200, + autoScroll: initialAutoScroll = true, + className = '', +}: LiveLogViewerProps) { + const [logs, setLogs] = useState([]) + const [isConnected, setIsConnected] = useState(false) + const [isPaused, setIsPaused] = useState(false) + const [autoScroll, setAutoScroll] = useState(initialAutoScroll) + const logContainerRef = useRef(null) + const eventSourceRef = useRef(null) + const pausedLogsRef = useRef([]) + + // 连接 SSE + const connect = useCallback(() => { + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + + const es = new EventSource('/api/logs/stream') + eventSourceRef.current = es + + es.onopen = () => { + setIsConnected(true) + } + + es.onmessage = (event) => { + try { + const data = JSON.parse(event.data) as LogEntry + + if (data.type === 'connected') { + console.log('SSE connected:', data) + return + } + + if (data.type === 'log') { + if (isPaused) { + pausedLogsRef.current.push(data) + } else { + setLogs((prev) => { + const newLogs = [...prev, data] + return newLogs.slice(-maxLogs) + }) + } + } + } catch (e) { + console.error('Parse error:', e) + } + } + + es.onerror = () => { + setIsConnected(false) + // 自动重连 + setTimeout(connect, 3000) + } + }, [isPaused, maxLogs]) + + // 组件挂载时连接 + useEffect(() => { + connect() + return () => { + if (eventSourceRef.current) { + eventSourceRef.current.close() + } + } + }, [connect]) + + // 自动滚动 + useEffect(() => { + if (autoScroll && logContainerRef.current) { + logContainerRef.current.scrollTop = logContainerRef.current.scrollHeight + } + }, [logs, autoScroll]) + + // 恢复暂停的日志 + const handleResume = () => { + setIsPaused(false) + if (pausedLogsRef.current.length > 0) { + setLogs((prev) => { + const newLogs = [...prev, ...pausedLogsRef.current] + pausedLogsRef.current = [] + return newLogs.slice(-maxLogs) + }) + } + } + + const handleClear = () => { + setLogs([]) + } + + return ( +
+ {/* Header */} +
+
+ + 实时日志 + + + {isConnected ? '已连接' : '连接中...'} + +
+
+ {logs.length} 条日志 + + + +
+
+ + {/* Log Content */} +
+ {logs.length === 0 ? ( +
+ 等待日志... +
+ ) : ( + logs.map((log, index) => ( +
+ {log.timestamp} + + [{log.level}] + + [{log.module}] + {log.email && ( + {log.email} + )} + {log.message} +
+ )) + )} +
+ + {/* Paused indicator */} + {isPaused && pausedLogsRef.current.length > 0 && ( +
+ 已暂停 - 有 {pausedLogsRef.current.length} 条新日志等待显示 +
+ )} +
+ ) +} diff --git a/frontend/src/pages/Monitor.tsx b/frontend/src/pages/Monitor.tsx index e84393c..6a9852e 100644 --- a/frontend/src/pages/Monitor.tsx +++ b/frontend/src/pages/Monitor.tsx @@ -15,6 +15,7 @@ import { Save, } from 'lucide-react' import { Card, CardHeader, CardTitle, CardContent, Button, Input, Switch } from '../components/common' +import LiveLogViewer from '../components/LiveLogViewer' import type { DashboardStats } from '../types' interface PoolStatus { @@ -736,6 +737,8 @@ export default function Monitor() { )} + {/* 实时日志 */} + ) } diff --git a/frontend/src/pages/Records.tsx b/frontend/src/pages/Records.tsx index 99ff420..127452d 100644 --- a/frontend/src/pages/Records.tsx +++ b/frontend/src/pages/Records.tsx @@ -1,39 +1,92 @@ -import { useState, useMemo } from 'react' -import { Trash2, Calendar } from 'lucide-react' -import { RecordList, RecordStats } from '../components/records' +import { useState, useEffect } from 'react' +import { RefreshCw, Calendar, TrendingUp, CheckCircle, Clock, AlertCircle } from 'lucide-react' import { Card, CardHeader, CardTitle, CardContent, Button, Input } from '../components/common' -import { useRecords } from '../hooks/useRecords' + +interface BatchRun { + id: number + started_at: string + finished_at: string + total_owners: number + total_registered: number + total_added_to_s2a: number + success_rate: number + duration_seconds: number + status: string + errors: string +} + +interface BatchStats { + total_added: number + today_added: number + avg_success_rate: number + week_added: number +} export default function Records() { - const { records, deleteRecord, clearRecords, getStats } = useRecords() + const [runs, setRuns] = useState([]) + const [stats, setStats] = useState({ total_added: 0, today_added: 0, avg_success_rate: 0, week_added: 0 }) + const [loading, setLoading] = useState(true) const [startDate, setStartDate] = useState('') const [endDate, setEndDate] = useState('') - const stats = useMemo(() => getStats(), [getStats]) + const fetchData = async () => { + setLoading(true) + try { + const [runsRes, statsRes] = await Promise.all([ + fetch('/api/batch/runs'), + fetch('/api/batch/stats') + ]) - const filteredRecords = useMemo(() => { - if (!startDate && !endDate) return records + if (runsRes.ok) { + const data = await runsRes.json() + if (data.code === 0) { + setRuns(data.data || []) + } + } - return records.filter((record) => { - const recordDate = new Date(record.timestamp) - const start = startDate ? new Date(startDate) : null - const end = endDate ? new Date(endDate + 'T23:59:59') : null - - if (start && recordDate < start) return false - if (end && recordDate > end) return false - return true - }) - }, [records, startDate, endDate]) - - const handleClearFilter = () => { - setStartDate('') - setEndDate('') + if (statsRes.ok) { + const data = await statsRes.json() + if (data.code === 0) { + setStats(data.data) + } + } + } catch (e) { + console.error('获取数据失败:', e) + } + setLoading(false) } - const handleClearAll = () => { - if (window.confirm('确定要清空所有记录吗?此操作不可恢复。')) { - clearRecords() - } + useEffect(() => { + fetchData() + }, []) + + // 筛选记录 + const filteredRuns = runs.filter((run) => { + if (!startDate && !endDate) return true + const recordDate = new Date(run.started_at) + const start = startDate ? new Date(startDate) : null + const end = endDate ? new Date(endDate + 'T23:59:59') : null + if (start && recordDate < start) return false + if (end && recordDate > end) return false + return true + }) + + const formatDuration = (seconds: number) => { + if (seconds < 60) return `${seconds}秒` + const mins = Math.floor(seconds / 60) + const secs = seconds % 60 + return `${mins}分${secs}秒` + } + + const formatTime = (dateStr: string) => { + if (!dateStr) return '-' + const date = new Date(dateStr) + return date.toLocaleString('zh-CN', { + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit' + }) } return ( @@ -44,20 +97,67 @@ export default function Records() {

加号记录

查看历史入库记录

- {records.length > 0 && ( - - )} + - {/* Stats */} - + {/* Stats Cards */} +
+ + +
+ +
+
总入库
+
{stats.total_added}
+
+
+
+
+ + + +
+ +
+
成功率
+
{stats.avg_success_rate.toFixed(1)}%
+
+
+
+
+ + + +
+ +
+
今日入库
+
{stats.today_added}
+
+
+
+
+ + + +
+ +
+
本周入库
+
{stats.week_added}
+
+
+
+
+
{/* Filter */} @@ -66,11 +166,6 @@ export default function Records() { 日期筛选 - {(startDate || endDate) && ( - - )}
@@ -91,8 +186,7 @@ export default function Records() { />
- 共 {filteredRecords.length} 条记录 - {filteredRecords.length !== records.length && (已筛选)} + 共 {filteredRuns.length} 条记录
@@ -104,7 +198,69 @@ export default function Records() { 记录列表 - + {loading ? ( +
+ + 加载中... +
+ ) : filteredRuns.length === 0 ? ( +
+ + 暂无加号记录 +
+ ) : ( +
+ + + + + + + + + + + + + + {filteredRuns.map((run) => ( + + + + + + + + + + ))} + +
时间母号数注册入库成功率耗时状态
{formatTime(run.started_at)}{run.total_owners}{run.total_registered} + {run.total_added_to_s2a} + + = 80 ? 'text-green-600' : run.success_rate >= 50 ? 'text-yellow-600' : 'text-red-600'}`}> + {run.success_rate.toFixed(1)}% + + {formatDuration(run.duration_seconds)} + {run.status === 'completed' ? ( + + + 完成 + + ) : run.status === 'running' ? ( + + + 运行中 + + ) : ( + + + {run.status} + + )} +
+
+ )}