feat(team_process): Add pause mechanism for consecutive zero S2A entries
- Implement pause/resume mechanism using sync.Cond to halt worker goroutines - Add consecutive zero S2A counter to detect when no accounts are added for 2+ teams - Trigger automatic 1-minute pause when consecutive zero entries detected - Log warning messages with team details when pause is triggered - Reset counter on successful S2A entries or when pause completes - Allow stop signal to interrupt pause period for graceful shutdown - Broadcast resume signal to all waiting workers after pause completes - Prevents resource exhaustion and provides recovery time during low-activity periods
This commit is contained in:
@@ -389,6 +389,11 @@ func runTeamProcess(req TeamProcessRequest) {
|
|||||||
taskChan := make(chan int, totalOwners)
|
taskChan := make(chan int, totalOwners)
|
||||||
resultChan := make(chan TeamProcessResult, totalOwners)
|
resultChan := make(chan TeamProcessResult, totalOwners)
|
||||||
|
|
||||||
|
// 连续入库为0的暂停机制
|
||||||
|
var pauseMu sync.Mutex
|
||||||
|
pauseCond := sync.NewCond(&pauseMu)
|
||||||
|
paused := false
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
|
|
||||||
// 启动 worker
|
// 启动 worker
|
||||||
@@ -400,6 +405,13 @@ func runTeamProcess(req TeamProcessRequest) {
|
|||||||
if isStopped() {
|
if isStopped() {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
// 检查是否处于暂停状态,等待恢复
|
||||||
|
pauseMu.Lock()
|
||||||
|
for paused {
|
||||||
|
pauseCond.Wait()
|
||||||
|
}
|
||||||
|
pauseMu.Unlock()
|
||||||
|
|
||||||
result := processSingleTeam(idx, req)
|
result := processSingleTeam(idx, req)
|
||||||
resultChan <- result
|
resultChan <- result
|
||||||
atomic.AddInt32(&teamProcessState.Completed, 1)
|
atomic.AddInt32(&teamProcessState.Completed, 1)
|
||||||
@@ -428,6 +440,9 @@ func runTeamProcess(req TeamProcessRequest) {
|
|||||||
close(resultChan)
|
close(resultChan)
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// 连续入库为0的计数器
|
||||||
|
consecutiveZeroS2A := 0
|
||||||
|
|
||||||
// 收集结果并统计
|
// 收集结果并统计
|
||||||
for result := range resultChan {
|
for result := range resultChan {
|
||||||
teamProcessState.mu.Lock()
|
teamProcessState.mu.Lock()
|
||||||
@@ -438,6 +453,40 @@ func runTeamProcess(req TeamProcessRequest) {
|
|||||||
totalAddedToS2A += result.AddedToS2A
|
totalAddedToS2A += result.AddedToS2A
|
||||||
allErrors = append(allErrors, result.Errors...)
|
allErrors = append(allErrors, result.Errors...)
|
||||||
|
|
||||||
|
// 检测连续入库为0的情况
|
||||||
|
if result.AddedToS2A == 0 {
|
||||||
|
consecutiveZeroS2A++
|
||||||
|
if consecutiveZeroS2A >= 2 {
|
||||||
|
logger.Warning(fmt.Sprintf("⚠ 连续 %d 个 Team 入库数为 0 (最近: Team %d - %s),暂停 1 分钟后继续...",
|
||||||
|
consecutiveZeroS2A, result.TeamIndex, result.OwnerEmail), "", "team")
|
||||||
|
|
||||||
|
// 设置暂停标志,阻止 worker 取新任务
|
||||||
|
pauseMu.Lock()
|
||||||
|
paused = true
|
||||||
|
pauseMu.Unlock()
|
||||||
|
|
||||||
|
// 等待 1 分钟(可被停止信号中断)
|
||||||
|
select {
|
||||||
|
case <-time.After(1 * time.Minute):
|
||||||
|
logger.Info("暂停结束,恢复批量处理...", "", "team")
|
||||||
|
case <-teamProcessState.stopCh:
|
||||||
|
logger.Warning("暂停期间收到停止信号,终止处理", "", "team")
|
||||||
|
}
|
||||||
|
|
||||||
|
// 恢复 worker
|
||||||
|
pauseMu.Lock()
|
||||||
|
paused = false
|
||||||
|
pauseMu.Unlock()
|
||||||
|
pauseCond.Broadcast()
|
||||||
|
|
||||||
|
// 重置计数器
|
||||||
|
consecutiveZeroS2A = 0
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// 入库不为0,重置连续计数
|
||||||
|
consecutiveZeroS2A = 0
|
||||||
|
}
|
||||||
|
|
||||||
// 保存单个Team结果到数据库
|
// 保存单个Team结果到数据库
|
||||||
if database.Instance != nil && batchID > 0 {
|
if database.Instance != nil && batchID > 0 {
|
||||||
dbResult := database.BatchTeamResult{
|
dbResult := database.BatchTeamResult{
|
||||||
|
|||||||
Reference in New Issue
Block a user