feat: add team process API.

This commit is contained in:
2026-02-06 21:18:53 +08:00
parent 6c8a036018
commit 1458b8b3e2

View File

@@ -63,10 +63,21 @@ type TeamProcessState struct {
Completed int32 `json:"completed"` Completed int32 `json:"completed"`
Results []TeamProcessResult `json:"results"` Results []TeamProcessResult `json:"results"`
mu sync.Mutex mu sync.Mutex
stopCh chan struct{} // 停止信号 channelclose 后所有 select 立即返回
} }
var teamProcessState = &TeamProcessState{} var teamProcessState = &TeamProcessState{}
// isStopped 检查是否已收到停止信号(非阻塞)
func isStopped() bool {
select {
case <-teamProcessState.stopCh:
return true
default:
return false
}
}
// waitGroupWithTimeout 带超时的 WaitGroup 等待,超时返回 false // waitGroupWithTimeout 带超时的 WaitGroup 等待,超时返回 false
func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool { func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
done := make(chan struct{}) done := make(chan struct{})
@@ -74,6 +85,8 @@ func waitGroupWithTimeout(wg *sync.WaitGroup, timeout time.Duration) bool {
select { select {
case <-done: case <-done:
return true return true
case <-teamProcessState.stopCh:
return false
case <-time.After(timeout): case <-time.After(timeout):
return false return false
} }
@@ -188,6 +201,7 @@ func HandleTeamProcess(w http.ResponseWriter, r *http.Request) {
// 初始化状态 // 初始化状态
teamProcessState.Running = true teamProcessState.Running = true
teamProcessState.stopCh = make(chan struct{})
teamProcessState.StartedAt = time.Now() teamProcessState.StartedAt = time.Now()
teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理 teamProcessState.TotalTeams = len(req.Owners) // 所有 owners 都会处理
teamProcessState.Completed = 0 teamProcessState.Completed = 0
@@ -232,6 +246,16 @@ func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
} }
teamProcessState.Running = false teamProcessState.Running = false
// 关闭 stopCh 广播停止信号,所有 select 监听的 goroutine 立即唤醒
if teamProcessState.stopCh != nil {
select {
case <-teamProcessState.stopCh:
// 已经关闭过了,不重复关闭
default:
close(teamProcessState.stopCh)
}
}
logger.Warning("收到停止信号,正在终止所有处理...", "", "team")
Success(w, map[string]string{"message": "已发送停止信号"}) Success(w, map[string]string{"message": "已发送停止信号"})
} }
@@ -373,7 +397,7 @@ func runTeamProcess(req TeamProcessRequest) {
go func(workerID int) { go func(workerID int) {
defer wg.Done() defer wg.Done()
for idx := range taskChan { for idx := range taskChan {
if !teamProcessState.Running { if isStopped() {
return return
} }
result := processSingleTeam(idx, req) result := processSingleTeam(idx, req)
@@ -386,10 +410,14 @@ func runTeamProcess(req TeamProcessRequest) {
// 发送任务 // 发送任务
go func() { go func() {
for i := 0; i < totalOwners; i++ { for i := 0; i < totalOwners; i++ {
if !teamProcessState.Running { select {
case <-teamProcessState.stopCh:
break
case taskChan <- i:
}
if isStopped() {
break break
} }
taskChan <- i
} }
close(taskChan) close(taskChan)
}() }()
@@ -660,9 +688,17 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1) memberLogPrefix := fmt.Sprintf("%s [Member %d]", logPrefix, memberIdx+1)
memberStartTime := time.Now() memberStartTime := time.Now()
// 获取入库信号量3分钟超时 // 检查停止信号
if isStopped() {
return false
}
// 获取入库信号量3分钟超时可被停止信号中断
select { select {
case s2aSem <- struct{}{}: case s2aSem <- struct{}{}:
case <-teamProcessState.stopCh:
logger.Warning(fmt.Sprintf("%s 收到停止信号,跳过入库", memberLogPrefix), memberEmail, "team")
return false
case <-time.After(3 * time.Minute): case <-time.After(3 * time.Minute):
logger.Warning(fmt.Sprintf("%s 入库信号量等待超时 (3分钟)", memberLogPrefix), memberEmail, "team") logger.Warning(fmt.Sprintf("%s 入库信号量等待超时 (3分钟)", memberLogPrefix), memberEmail, "team")
atomic.AddInt32(&s2aFailCount, 1) atomic.AddInt32(&s2aFailCount, 1)
@@ -685,6 +721,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
var lastError string var lastError string
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
// 检查停止信号
if isStopped() {
return false
}
if attempt > 0 { if attempt > 0 {
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberEmail, "team") logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", memberLogPrefix, attempt+1), memberEmail, "team")
} }
@@ -778,7 +818,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试 for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次首次+1次重试
// 检查是否应该停止 // 检查是否应该停止
if !teamProcessState.Running || isTeamExhausted() { if isStopped() || isTeamExhausted() {
return false return false
} }
@@ -835,8 +875,12 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
s2aWg.Add(1) s2aWg.Add(1)
go func(idx int, e, p string) { go func(idx int, e, p string) {
defer s2aWg.Done() defer s2aWg.Done()
// 等待 3 秒,让账号状态稳定 // 等待 3 秒,让账号状态稳定(可被停止信号中断)
time.Sleep(3 * time.Second) select {
case <-time.After(3 * time.Second):
case <-teamProcessState.stopCh:
return
}
success := doS2A(idx, e, p) success := doS2A(idx, e, p)
memberMu.Lock() memberMu.Lock()
children[idx].S2ADone = true children[idx].S2ADone = true
@@ -859,7 +903,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
go func(idx int) { go func(idx int) {
defer regWg.Done() defer regWg.Done()
// 检查是否应该停止 // 检查是否应该停止
if isTeamExhausted() { if isStopped() || isTeamExhausted() {
return return
} }
email := mail.GenerateEmail() email := mail.GenerateEmail()
@@ -893,7 +937,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
// 第二轮Team 有 4 次额外补救机会(如果 Team 未满) // 第二轮Team 有 4 次额外补救机会(如果 Team 未满)
teamRetries := 4 teamRetries := 4
for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ { for retry := 0; retry < teamRetries && len(failedSlots) > 0 && !isStopped() && !isTeamExhausted(); retry++ {
slotIdx := failedSlots[0] slotIdx := failedSlots[0]
logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team")
@@ -936,7 +980,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
} }
// Step 4: 母号也入库(如果开启)- 带重试 // Step 4: 母号也入库(如果开启)- 带重试
if req.IncludeOwner && teamProcessState.Running { if req.IncludeOwner && !isStopped() {
ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix) ownerLogPrefix := fmt.Sprintf("%s [母号 ]", logPrefix)
ownerStartTime := time.Now() ownerStartTime := time.Now()
logger.Status(fmt.Sprintf("%s 母号入库中...", ownerLogPrefix), owner.Email, "team") logger.Status(fmt.Sprintf("%s 母号入库中...", ownerLogPrefix), owner.Email, "team")
@@ -944,6 +988,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
var ownerSuccess bool var ownerSuccess bool
var lastError string var lastError string
for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 for attempt := 0; attempt < 2; attempt++ { // 最多重试1次
// 检查停止信号
if isStopped() {
break
}
if attempt > 0 { if attempt > 0 {
logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team") logger.Warning(fmt.Sprintf("%s 入库重试 (第%d次)", ownerLogPrefix, attempt+1), owner.Email, "team")
} }