diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index d8d03d4..621e88f 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -389,6 +389,11 @@ func runTeamProcess(req TeamProcessRequest) { taskChan := make(chan int, totalOwners) resultChan := make(chan TeamProcessResult, totalOwners) + // 连续入库为0的暂停机制 + var pauseMu sync.Mutex + pauseCond := sync.NewCond(&pauseMu) + paused := false + var wg sync.WaitGroup // 启动 worker @@ -400,6 +405,13 @@ func runTeamProcess(req TeamProcessRequest) { if isStopped() { return } + // 检查是否处于暂停状态,等待恢复 + pauseMu.Lock() + for paused { + pauseCond.Wait() + } + pauseMu.Unlock() + result := processSingleTeam(idx, req) resultChan <- result atomic.AddInt32(&teamProcessState.Completed, 1) @@ -428,6 +440,9 @@ func runTeamProcess(req TeamProcessRequest) { close(resultChan) }() + // 连续入库为0的计数器 + consecutiveZeroS2A := 0 + // 收集结果并统计 for result := range resultChan { teamProcessState.mu.Lock() @@ -438,6 +453,40 @@ func runTeamProcess(req TeamProcessRequest) { totalAddedToS2A += result.AddedToS2A allErrors = append(allErrors, result.Errors...) + // 检测连续入库为0的情况 + if result.AddedToS2A == 0 { + consecutiveZeroS2A++ + if consecutiveZeroS2A >= 2 { + logger.Warning(fmt.Sprintf("⚠ 连续 %d 个 Team 入库数为 0 (最近: Team %d - %s),暂停 1 分钟后继续...", + consecutiveZeroS2A, result.TeamIndex, result.OwnerEmail), "", "team") + + // 设置暂停标志,阻止 worker 取新任务 + pauseMu.Lock() + paused = true + pauseMu.Unlock() + + // 等待 1 分钟(可被停止信号中断) + select { + case <-time.After(1 * time.Minute): + logger.Info("暂停结束,恢复批量处理...", "", "team") + case <-teamProcessState.stopCh: + logger.Warning("暂停期间收到停止信号,终止处理", "", "team") + } + + // 恢复 worker + pauseMu.Lock() + paused = false + pauseMu.Unlock() + pauseCond.Broadcast() + + // 重置计数器 + consecutiveZeroS2A = 0 + } + } else { + // 入库不为0,重置连续计数 + consecutiveZeroS2A = 0 + } + // 保存单个Team结果到数据库 if database.Instance != nil && batchID > 0 { dbResult := database.BatchTeamResult{