From 1458b8b3e22307e8209d91b5a234beb7509b0158 Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 6 Feb 2026 21:18:53 +0800 Subject: [PATCH] feat: add team process API. --- backend/internal/api/team_process.go | 68 ++++++++++++++++++++++++---- 1 file changed, 58 insertions(+), 10 deletions(-) diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index eb877ca..c5c2d18 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -63,10 +63,21 @@ type TeamProcessState struct { Completed int32 `json:"completed"` Results []TeamProcessResult `json:"results"` mu sync.Mutex + stopCh chan struct{} // 停止信号 channel,close 后所有 select 立即返回 } var teamProcessState = &TeamProcessState{} +// isStopped 检查是否已收到停止信号(非阻塞) +func isStopped() bool { + select { + case <-teamProcessState.stopCh: + return true + default: + return false + } +} + // waitGroupWithTimeout 带超时的 WaitGroup 等待,超时返回 false func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { done := make(chan struct{}) @@ -74,6 +85,8 @@ func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { select { case <-done: return true + case <-teamProcessState.stopCh: + return false case <-time.After(timeout): return false } @@ -188,6 +201,7 @@ func HandleTeamProcess(w http.ResponseWriter, r *http.Request) { // 初始化状态 teamProcessState.Running = true + teamProcessState.stopCh = make(chan struct{}) teamProcessState.StartedAt = time.Now() teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理 teamProcessState.Completed = 0 @@ -232,6 +246,16 @@ func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) { } teamProcessState.Running = false + // 关闭 stopCh 广播停止信号,所有 select 监听的 goroutine 立即唤醒 + if teamProcessState.stopCh != nil { + select { + case <-teamProcessState.stopCh: + // 已经关闭过了,不重复关闭 + default: + close(teamProcessState.stopCh) + } + } + logger.Warning("收到停止信号,正在终止所有处理...", "", "team") Success(w, map[string]string{"message": "已发送停止信号"}) } @@ -373,7 +397,7 @@ func runTeamProcess(req TeamProcessRequest) { go func(workerID int) { defer wg.Done() for idx := range taskChan { - if !teamProcessState.Running { + if isStopped() { return } result := processSingleTeam(idx, req) @@ -386,10 +410,14 @@ func runTeamProcess(req TeamProcessRequest) { // 发送任务 go func() { for i := 0; i < totalOwners; i++ { - if !teamProcessState.Running { + select { + case <-teamProcessState.stopCh: + break + case taskChan <- i: + } + if isStopped() { break } - taskChan <- i } close(taskChan) }() @@ -660,9 +688,17 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1) memberStartTime := time.Now() - // 获取入库信号量(3分钟超时) + // 检查停止信号 + if isStopped() { + return false + } + + // 获取入库信号量(3分钟超时,可被停止信号中断) select { case s2aSem <- struct{}{}: + case <-teamProcessState.stopCh: + logger.Warning(fmt.Sprintf("%s 收到停止信号,跳过入库", memberLogPrefix), memberEmail, "team") + return false case <-time.After(3 * time.Minute): logger.Warning(fmt.Sprintf("%s 入库信号量等待超时 (3分钟)", memberLogPrefix), memberEmail, "team") atomic.AddInt32(&s2aFailCount, 1) @@ -685,6 +721,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul var lastError string for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 + // 检查停止信号 + if isStopped() { + return false + } if attempt > 0 { logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberEmail, "team") } @@ -778,7 +818,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试) // 检查是否应该停止 - if !teamProcessState.Running || isTeamExhausted() { + if isStopped() || isTeamExhausted() { return false } @@ -835,8 +875,12 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul s2aWg.Add(1) go func(idx int, e, p string) { defer s2aWg.Done() - // 等待 3 秒,让账号状态稳定 - time.Sleep(3 * time.Second) + // 等待 3 秒,让账号状态稳定(可被停止信号中断) + select { + case <-time.After(3 * time.Second): + case <-teamProcessState.stopCh: + return + } success := doS2A(idx, e, p) memberMu.Lock() children[idx].S2ADone = true @@ -859,7 +903,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul go func(idx int) { defer regWg.Done() // 检查是否应该停止 - if isTeamExhausted() { + if isStopped() || isTeamExhausted() { return } email := mail.GenerateEmail() @@ -893,7 +937,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul // 第二轮:Team 有 4 次额外补救机会(如果 Team 未满) teamRetries := 4 - for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ { + for retry := 0; retry < teamRetries && len(failedSlots) > 0 && !isStopped() && !isTeamExhausted(); retry++ { slotIdx := failedSlots[0] logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") @@ -936,7 +980,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul } // Step 4: 母号也入库(如果开启)- 带重试 - if req.IncludeOwner && teamProcessState.Running { + if req.IncludeOwner && !isStopped() { ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix) ownerStartTime := time.Now() logger.Status(fmt.Sprintf("%s 母号入库中...", ownerLogPrefix), owner.Email, "team") @@ -944,6 +988,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul var ownerSuccess bool var lastError string for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 + // 检查停止信号 + if isStopped() { + break + } if attempt > 0 { logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team") }