feat: Implement core backend services for team owner management, SQLite persistence, and logging, alongside frontend monitoring and record views.

This commit is contained in:
2026-01-30 18:15:50 +08:00
parent e61430b60d
commit 165c6d69b9
7 changed files with 844 additions and 125 deletions

View File

@@ -92,6 +92,7 @@ func startServer(cfg *config.Config) {
// 日志 API
mux.HandleFunc("/api/logs", api.CORS(handleGetLogs))
mux.HandleFunc("/api/logs/clear", api.CORS(handleClearLogs))
mux.HandleFunc("/api/logs/stream", handleLogStream) // SSE 实时日志
// S2A 代理 API
mux.HandleFunc("/api/s2a/test", api.CORS(handleS2ATest))
@@ -117,6 +118,10 @@ func startServer(cfg *config.Config) {
mux.HandleFunc("/api/team/status", api.CORS(api.HandleTeamProcessStatus))
mux.HandleFunc("/api/team/stop", api.CORS(api.HandleTeamProcessStop))
// 批次记录 API
mux.HandleFunc("/api/batch/runs", api.CORS(handleBatchRuns))
mux.HandleFunc("/api/batch/stats", api.CORS(handleBatchStats))
// 监控设置 API
mux.HandleFunc("/api/monitor/settings", api.CORS(api.HandleGetMonitorSettings))
mux.HandleFunc("/api/monitor/settings/save", api.CORS(api.HandleSaveMonitorSettings))
@@ -252,6 +257,84 @@ func handleClearLogs(w http.ResponseWriter, r *http.Request) {
api.Success(w, map[string]string{"message": "日志已清空"})
}
// handleBatchRuns 获取批次运行记录
func handleBatchRuns(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
runs, err := database.Instance.GetBatchRuns(50)
if err != nil {
api.Error(w, http.StatusInternalServerError, fmt.Sprintf("获取记录失败: %v", err))
return
}
api.Success(w, runs)
}
// handleBatchStats 获取批次统计
func handleBatchStats(w http.ResponseWriter, r *http.Request) {
if database.Instance == nil {
api.Error(w, http.StatusInternalServerError, "数据库未初始化")
return
}
stats := database.Instance.GetBatchRunStats()
api.Success(w, stats)
}
// handleLogStream SSE 实时日志流
func handleLogStream(w http.ResponseWriter, r *http.Request) {
// 设置 SSE 响应头
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("Access-Control-Allow-Origin", "*")
// 生成唯一 ID
listenerID := fmt.Sprintf("client-%d", time.Now().UnixNano())
// 订阅日志
logChan := logger.AddListener(listenerID)
defer logger.RemoveListener(listenerID)
// 获取 flusher
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "SSE not supported", http.StatusInternalServerError)
return
}
// 发送初始连接确认
fmt.Fprintf(w, "data: {\"type\":\"connected\",\"id\":\"%s\"}\n\n", listenerID)
flusher.Flush()
// 监听日志和连接关闭
ctx := r.Context()
for {
select {
case <-ctx.Done():
return
case entry, ok := <-logChan:
if !ok {
return
}
// 发送日志条目
data, _ := json.Marshal(map[string]interface{}{
"type": "log",
"timestamp": entry.Timestamp.Format("15:04:05"),
"level": entry.Level,
"message": entry.Message,
"email": entry.Email,
"module": entry.Module,
})
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
}
}
}
func handleS2ATest(w http.ResponseWriter, r *http.Request) {
if config.Global == nil || config.Global.S2AApiBase == "" {
api.Error(w, http.StatusBadRequest, "S2A 配置未设置")

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"
@@ -187,7 +186,17 @@ func runTeamProcess(req TeamProcessRequest) {
workerCount = 2 // 默认 2 个并发
}
logger.Info(fmt.Sprintf("Starting Team process: %d owners, %d concurrent workers", totalOwners, workerCount), "", "team")
// 创建批次记录
var batchID int64
if database.Instance != nil {
var err error
batchID, err = database.Instance.CreateBatchRun(totalOwners)
if err != nil {
logger.Error(fmt.Sprintf("创建批次记录失败: %v", err), "", "team")
}
}
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
// 任务队列
taskChan := make(chan int, totalOwners)
@@ -228,13 +237,42 @@ func runTeamProcess(req TeamProcessRequest) {
close(resultChan)
}()
// 统计总数
var totalRegistered, totalAddedToS2A int
var allErrors []string
for result := range resultChan {
teamProcessState.mu.Lock()
teamProcessState.Results = append(teamProcessState.Results, result)
teamProcessState.mu.Unlock()
totalRegistered += result.Registered
totalAddedToS2A += result.AddedToS2A
allErrors = append(allErrors, result.Errors...)
}
logger.Success(fmt.Sprintf("Team process complete: %d/%d teams processed", teamProcessState.Completed, totalOwners), "", "team")
// 更新批次记录
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr)
}
// 计算成功率
expectedTotal := totalOwners * req.MembersPerTeam
successRate := float64(0)
if expectedTotal > 0 {
successRate = float64(totalAddedToS2A) / float64(expectedTotal) * 100
}
logger.Success(fmt.Sprintf("批量处理完成: %d/%d 个 Team | 注册: %d, 入库: %d, 成功率: %.1f%%",
teamProcessState.Completed, totalOwners, totalRegistered, totalAddedToS2A, successRate), "", "team")
}
// processSingleTeam 处理单个 Team
@@ -249,7 +287,24 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
logPrefix := fmt.Sprintf("[Team %d]", idx+1)
logger.Info(fmt.Sprintf("%s Starting with owner: %s", logPrefix, owner.Email), owner.Email, "team")
logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team")
// 标记 owner 为处理中
if database.Instance != nil {
database.Instance.MarkOwnerAsProcessing(owner.Email)
}
// 处理失败时的清理函数
markOwnerResult := func(success bool) {
if database.Instance != nil {
if success {
database.Instance.MarkOwnerAsUsed(owner.Email)
} else {
// 失败时恢复为 valid允许重试
database.Instance.MarkOwnerAsFailed(owner.Email)
}
}
}
// Step 1: 获取 Team ID优先使用已存储的 account_id
var teamID string
@@ -268,92 +323,102 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team")
markOwnerResult(false)
return result
}
logger.Success(fmt.Sprintf("%s 获取到 Team ID: %s", logPrefix, teamID), owner.Email, "team")
}
result.TeamID = teamID
// Step 2: 生成成员邮箱并发送邀请
// Step 2: 并发注册成员
// 每个成员:邀请 → 注册失败重试1次
// Team 有4次额外补救机会
type MemberAccount struct {
Email string
Password string
Success bool
}
children := make([]MemberAccount, req.MembersPerTeam)
for i := 0; i < req.MembersPerTeam; i++ {
children[i].Email = mail.GenerateEmail()
children[i].Password = register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [Member %d] Email: %s", logPrefix, i+1, children[i].Email), children[i].Email, "team")
}
// 发送邀请
inviteEmails := make([]string, req.MembersPerTeam)
for i, c := range children {
inviteEmails[i] = c.Email
}
if err := inviter.SendInvites(inviteEmails); err != nil {
result.Errors = append(result.Errors, fmt.Sprintf("发送邀请失败: %v", err))
result.DurationMs = time.Since(startTime).Milliseconds()
return result
}
logger.Success(fmt.Sprintf("%s Sent %d invite(s)", logPrefix, len(inviteEmails)), owner.Email, "team")
// Step 3: 并发注册成员
var memberMu sync.Mutex
var memberWg sync.WaitGroup
memberMutex := sync.Mutex{}
for i := range children {
memberWg.Add(1)
go func(memberIdx int) {
defer memberWg.Done()
// 注册单个成员的函数带1次重试
registerMember := func(memberIdx int, email, password string) bool {
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
memberMutex.Lock()
email := children[memberIdx].Email
password := children[memberIdx].Password
memberMutex.Unlock()
name := register.GenerateName()
birthdate := register.GenerateBirthdate()
// 重试逻辑
for attempt := 0; attempt < 3; attempt++ {
if !teamProcessState.Running {
return
}
if attempt > 0 {
email = mail.GenerateEmail()
password = register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [Member %d] Retry %d: %s", logPrefix, memberIdx+1, attempt, email), email, "team")
if err := inviter.SendInvites([]string{email}); err != nil {
continue
}
}
_, err := registerWithTimeout(email, password, name, birthdate, req.Proxy)
if err != nil {
if strings.Contains(err.Error(), "验证码") {
continue
}
result.Errors = append(result.Errors, fmt.Sprintf("Member %d: %v", memberIdx+1, err))
return
}
memberMutex.Lock()
children[memberIdx].Email = email
children[memberIdx].Password = password
children[memberIdx].Success = true
memberMutex.Unlock()
logger.Success(fmt.Sprintf("%s [Member %d] Registered", logPrefix, memberIdx+1), email, "team")
return
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
if !teamProcessState.Running {
return false
}
currentEmail := email
currentPassword := password
if attempt > 0 {
// 重试时使用新邮箱
currentEmail = mail.GenerateEmail()
currentPassword = register.GeneratePassword()
logger.Warning(fmt.Sprintf("%s [成员 %d] 重试,新邮箱: %s", logPrefix, memberIdx+1, currentEmail), currentEmail, "team")
}
// 发送邀请
if err := inviter.SendInvites([]string{currentEmail}); err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
continue
}
// 注册
_, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy)
if err != nil {
logger.Error(fmt.Sprintf("%s [成员 %d] 注册失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team")
continue
}
// 成功
memberMu.Lock()
children[memberIdx] = MemberAccount{Email: currentEmail, Password: currentPassword, Success: true}
memberMu.Unlock()
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 注册成功", logPrefix, memberIdx+1), currentEmail, "team")
return true
}
return false
}
// 第一轮并发注册4个成员
logger.Info(fmt.Sprintf("%s 开始并发注册 %d 个成员", logPrefix, req.MembersPerTeam), owner.Email, "team")
for i := 0; i < req.MembersPerTeam; i++ {
memberWg.Add(1)
go func(idx int) {
defer memberWg.Done()
email := mail.GenerateEmail()
password := register.GeneratePassword()
logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team")
registerMember(idx, email, password)
}(i)
}
memberWg.Wait()
// 统计失败的成员
failedSlots := make([]int, 0)
for i, c := range children {
if !c.Success {
failedSlots = append(failedSlots, i)
}
}
// 第二轮Team 有 4 次额外补救机会
teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ {
slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
email := mail.GenerateEmail()
password := register.GeneratePassword()
if registerMember(slotIdx, email, password) {
failedSlots = failedSlots[1:] // 成功,移除这个槽位
}
}
// 统计注册成功数
registeredChildren := make([]MemberAccount, 0)
for _, c := range children {
@@ -363,7 +428,11 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
result.Registered++
}
}
logger.Info(fmt.Sprintf("%s Registered: %d/%d", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team")
if len(failedSlots) > 0 {
result.Errors = append(result.Errors, fmt.Sprintf("%d 个成员注册失败", len(failedSlots)))
}
logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team")
// Step 4: S2A 授权入库(成员)
for i, child := range registeredChildren {
@@ -407,7 +476,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
}
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [Member %d] Added to S2A", logPrefix, i+1), child.Email, "team")
logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team")
}
// Step 5: 母号也入库(如果开启)
@@ -442,14 +511,18 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult {
result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err))
} else {
result.AddedToS2A++
logger.Success(fmt.Sprintf("%s [Owner] Added to S2A", logPrefix), owner.Email, "team")
logger.Success(fmt.Sprintf("%s [母号] ✓ 入库成功", logPrefix), owner.Email, "team")
}
}
}
}
result.DurationMs = time.Since(startTime).Milliseconds()
logger.Success(fmt.Sprintf("%s Complete: %d registered, %d in S2A", logPrefix, result.Registered, result.AddedToS2A), owner.Email, "team")
logger.Success(fmt.Sprintf("%s 完成 | 注册: %d, 入库: %d, 耗时: %.1fs", logPrefix, result.Registered, result.AddedToS2A, float64(result.DurationMs)/1000), owner.Email, "team")
// 根据入库结果标记 owner 状态
// 只要有任何一个账号入库成功,就标记为已使用
markOwnerResult(result.AddedToS2A > 0)
return result
}

View File

@@ -67,6 +67,22 @@ func (d *DB) createTables() error {
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 批次运行记录表
CREATE TABLE IF NOT EXISTS batch_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
finished_at DATETIME,
total_owners INTEGER DEFAULT 0,
total_registered INTEGER DEFAULT 0,
total_added_to_s2a INTEGER DEFAULT 0,
success_rate REAL DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
errors TEXT
);
CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at);
`)
return err
}
@@ -195,7 +211,7 @@ func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int,
return owners, total, nil
}
// GetPendingOwners 获取待处理
// GetPendingOwners 获取待处理(排除已使用和处理中的)
func (d *DB) GetPendingOwners() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at
@@ -219,6 +235,24 @@ func (d *DB) GetPendingOwners() ([]TeamOwner, error) {
return owners, nil
}
// MarkOwnerAsUsed 标记 Owner 为已使用
func (d *DB) MarkOwnerAsUsed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email)
return err
}
// MarkOwnerAsProcessing 标记 Owner 为处理中
func (d *DB) MarkOwnerAsProcessing(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email)
return err
}
// MarkOwnerAsFailed 标记 Owner 为失败(可重试)
func (d *DB) MarkOwnerAsFailed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email)
return err
}
// UpdateOwnerStatus 更新状态
func (d *DB) UpdateOwnerStatus(id int64, status string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id)
@@ -293,6 +327,135 @@ func (d *DB) GetOwnerStats() map[string]int {
return stats
}
// BatchRun 批次运行记录
type BatchRun struct {
ID int64 `json:"id"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
TotalOwners int `json:"total_owners"`
TotalRegistered int `json:"total_registered"`
TotalAddedToS2A int `json:"total_added_to_s2a"`
SuccessRate float64 `json:"success_rate"`
DurationSeconds int `json:"duration_seconds"`
Status string `json:"status"`
Errors string `json:"errors,omitempty"`
}
// CreateBatchRun 创建批次记录
func (d *DB) CreateBatchRun(totalOwners int) (int64, error) {
result, err := d.db.Exec(
`INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`,
time.Now(), totalOwners,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// UpdateBatchRun 更新批次记录
func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error {
finishedAt := time.Now()
// 获取开始时间计算耗时
var startedAt time.Time
var totalOwners int
d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners)
duration := int(finishedAt.Sub(startedAt).Seconds())
// 计算成功率(以入库数为准)
var successRate float64
expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员
if expectedTotal > 0 {
successRate = float64(addedToS2A) / float64(expectedTotal) * 100
}
_, err := d.db.Exec(
`UPDATE batch_runs SET
finished_at = ?,
total_registered = ?,
total_added_to_s2a = ?,
success_rate = ?,
duration_seconds = ?,
status = 'completed',
errors = ?
WHERE id = ?`,
finishedAt, registered, addedToS2A, successRate, duration, errors, id,
)
return err
}
// GetBatchRuns 获取批次记录列表
func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) {
if limit <= 0 {
limit = 50
}
rows, err := d.db.Query(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs
ORDER BY started_at DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []BatchRun
for rows.Next() {
var run BatchRun
var finishedAt sql.NullTime
err := rows.Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
continue
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
runs = append(runs, run)
}
return runs, nil
}
// GetBatchRunStats 获取批次统计
func (d *DB) GetBatchRunStats() map[string]interface{} {
stats := make(map[string]interface{})
var totalAdded, todayAdded int
var avgRate float64
// 总入库数
d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded)
stats["total_added"] = totalAdded
// 今日入库数
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE DATE(started_at) = DATE('now', 'localtime')
`).Scan(&todayAdded)
stats["today_added"] = todayAdded
// 平均成功率
d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate)
stats["avg_success_rate"] = avgRate
// 本周入库数
var weekAdded int
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE started_at >= datetime('now', '-7 days')
`).Scan(&weekAdded)
stats["week_added"] = weekAdded
return stats
}
// Close 关闭数据库
func (d *DB) Close() error {
if d.db != nil {

View File

@@ -2,6 +2,8 @@ package logger
import (
"fmt"
"strconv"
"strings"
"sync"
"time"
)
@@ -84,6 +86,35 @@ func log(level, message, email, module string) {
colorYellow := "\033[33m"
colorCyan := "\033[36m"
// Team 颜色列表(用于区分不同 Team
teamColors := []string{
"\033[38;5;39m", // 亮蓝
"\033[38;5;208m", // 橙色
"\033[38;5;141m", // 紫色
"\033[38;5;48m", // 青绿
"\033[38;5;197m", // 粉红
"\033[38;5;226m", // 亮黄
"\033[38;5;87m", // 青色
"\033[38;5;156m", // 浅绿
"\033[38;5;219m", // 浅粉
"\033[38;5;117m", // 天蓝
}
// 从消息中提取 Team 编号
teamColor := ""
if strings.Contains(message, "[Team ") {
start := strings.Index(message, "[Team ")
if start >= 0 {
end := strings.Index(message[start:], "]")
if end > 0 {
teamStr := message[start+6 : start+end]
if teamNum, err := strconv.Atoi(teamStr); err == nil && teamNum > 0 {
teamColor = teamColors[(teamNum-1)%len(teamColors)]
}
}
}
}
prefix := ""
color := ""
switch level {
@@ -101,16 +132,22 @@ func log(level, message, email, module string) {
color = colorYellow
}
// 如果是 Team 相关日志,消息使用 Team 颜色
msgColor := colorReset
if teamColor != "" {
msgColor = teamColor
}
if email != "" {
fmt.Printf("%s%s%s %s[%s]%s [%s] %s - %s\n",
fmt.Printf("%s%s%s %s[%s]%s [%s] %s - %s%s%s\n",
colorGray, timestamp, colorReset,
color, prefix, colorReset,
module, email, message)
module, email, msgColor, message, colorReset)
} else {
fmt.Printf("%s%s%s %s[%s]%s [%s] %s\n",
fmt.Printf("%s%s%s %s[%s]%s [%s] %s%s%s\n",
colorGray, timestamp, colorReset,
color, prefix, colorReset,
module, message)
module, msgColor, message, colorReset)
}
}