From 634b493524625929ff6a89e6e8e8a792b37fe23e Mon Sep 17 00:00:00 2001 From: kyx236 Date: Sat, 31 Jan 2026 02:24:08 +0800 Subject: [PATCH] feat: Implement S2A account cleaner management and a global application configuration context, with supporting backend API. --- backend/internal/api/team_process.go | 261 ++++++++++++++++------ frontend/src/components/LiveLogViewer.tsx | 49 ++-- frontend/src/context/ConfigContext.tsx | 5 + frontend/src/pages/Cleaner.tsx | 27 +++ 4 files changed, 251 insertions(+), 91 deletions(-) diff --git a/backend/internal/api/team_process.go b/backend/internal/api/team_process.go index e999182..abb26f7 100644 --- a/backend/internal/api/team_process.go +++ b/backend/internal/api/team_process.go @@ -286,10 +286,10 @@ func runTeamProcess(req TeamProcessRequest) { } // processSingleTeam 处理单个 Team -func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { +func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResult) { startTime := time.Now() owner := req.Owners[idx] - result := TeamProcessResult{ + result = TeamProcessResult{ TeamIndex: idx + 1, OwnerEmail: owner.Email, MemberEmails: make([]string, 0), @@ -298,6 +298,20 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { // 固定宽度的 Team 编号 (支持到 Team 99) logPrefix := fmt.Sprintf("[Team %2d]", idx+1) + + // panic 恢复,防止单个 Team 处理崩溃影响整个任务 + defer func() { + if r := recover(); r != nil { + logger.Error(fmt.Sprintf("%s 处理异常: %v", logPrefix, r), owner.Email, "team") + result.Errors = append(result.Errors, fmt.Sprintf("处理异常: %v", r)) + result.DurationMs = time.Since(startTime).Milliseconds() + // 恢复为 valid 允许重试 + if database.Instance != nil { + database.Instance.MarkOwnerAsFailed(owner.Email) + } + } + }() + logger.Info(fmt.Sprintf("%s 开始处理 | 母号: %s", logPrefix, owner.Email), owner.Email, "team") // 标记 owner 为处理中 @@ -331,9 +345,21 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { var err error teamID, err = inviter.GetAccountID() if err != nil { + errStr := err.Error() result.Errors = append(result.Errors, fmt.Sprintf("获取 Team ID 失败: %v", err)) result.DurationMs = time.Since(startTime).Milliseconds() logger.Error(fmt.Sprintf("%s Failed to get Team ID: %v", logPrefix, err), owner.Email, "team") + + // Token 过期或无效,标记为 invalid 不再重试 + if strings.Contains(errStr, "401") || strings.Contains(errStr, "403") || + strings.Contains(errStr, "unauthorized") || strings.Contains(errStr, "invalid") { + logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team") + if database.Instance != nil { + database.Instance.MarkOwnerAsInvalid(owner.Email) + } + return result + } + markOwnerResult(false) return result } @@ -341,15 +367,26 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } result.TeamID = teamID - // Step 2: 测试邀请功能(检测 Team 是否被封禁) + // Step 2: 测试邀请功能(检测 Team 是否被封禁或已满) testEmail := mail.GenerateEmail() if err := inviter.SendInvites([]string{testEmail}); err != nil { - // 邀请失败,可能是 Team 被封禁 errStr := err.Error() + + // 检测 Team 已达邀请上限(401 或 maximum number of seats) + if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") { + logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team") + if database.Instance != nil { + database.Instance.MarkOwnerAsUsed(owner.Email) + } + result.Errors = append(result.Errors, "Team 邀请已满") + result.DurationMs = time.Since(startTime).Milliseconds() + return result + } + + // 检测 Team 被封禁 if strings.Contains(errStr, "403") || strings.Contains(errStr, "forbidden") || strings.Contains(errStr, "banned") || strings.Contains(errStr, "suspended") || strings.Contains(errStr, "deactivated") { - // Team 被封禁,标记为 invalid logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team") if database.Instance != nil { database.Instance.MarkOwnerAsInvalid(owner.Email) @@ -358,6 +395,7 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { result.DurationMs = time.Since(startTime).Milliseconds() return result } + // 其他邀请错误,继续尝试 logger.Warning(fmt.Sprintf("%s 首次邀请失败,继续尝试: %v", logPrefix, err), owner.Email, "team") } @@ -374,13 +412,32 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { var memberMu sync.Mutex var memberWg sync.WaitGroup + // 共享标志:Team 邀请已满,所有 goroutine 应停止 + var teamExhausted int32 + + // 检查 Team 是否已满的辅助函数 + isTeamExhausted := func() bool { + return atomic.LoadInt32(&teamExhausted) == 1 + } + + // 标记 Team 已满 + markTeamExhausted := func() { + if atomic.CompareAndSwapInt32(&teamExhausted, 0, 1) { + logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team") + if database.Instance != nil { + database.Instance.MarkOwnerAsUsed(owner.Email) + } + } + } + // 注册单个成员的函数(带1次重试) registerMember := func(memberIdx int, email, password string) bool { name := register.GenerateName() birthdate := register.GenerateBirthdate() for attempt := 0; attempt < 2; attempt++ { // 最多尝试2次(首次+1次重试) - if !teamProcessState.Running { + // 检查是否应该停止 + if !teamProcessState.Running || isTeamExhausted() { return false } @@ -398,18 +455,19 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { errStr := err.Error() logger.Error(fmt.Sprintf("%s [成员 %d] 邀请失败: %v", logPrefix, memberIdx+1, err), currentEmail, "team") - // 检测 Team 已达邀请上限 - if strings.Contains(errStr, "maximum number of seats") { - logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用", logPrefix), owner.Email, "team") - if database.Instance != nil { - database.Instance.MarkOwnerAsUsed(owner.Email) - } - // 跳出重试,该成员不再处理 + // 检测 Team 已达邀请上限(401 或 maximum number of seats) + if strings.Contains(errStr, "401") || strings.Contains(errStr, "maximum number of seats") { + markTeamExhausted() return false } continue } + // 再次检查是否应该停止(邀请期间其他 goroutine 可能已标记) + if isTeamExhausted() { + return false + } + // 注册 _, err := registerWithTimeout(currentEmail, currentPassword, name, birthdate, req.Proxy) if err != nil { @@ -433,6 +491,10 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { memberWg.Add(1) go func(idx int) { defer memberWg.Done() + // 检查是否应该停止 + if isTeamExhausted() { + return + } email := mail.GenerateEmail() password := register.GeneratePassword() logger.Info(fmt.Sprintf("%s [成员 %d] 邮箱: %s", logPrefix, idx+1, email), email, "team") @@ -441,6 +503,13 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } memberWg.Wait() + // 如果 Team 已满,直接跳过补救和后续处理 + if isTeamExhausted() { + result.Errors = append(result.Errors, "Team 邀请已满") + result.DurationMs = time.Since(startTime).Milliseconds() + return result + } + // 统计失败的成员 failedSlots := make([]int, 0) for i, c := range children { @@ -449,9 +518,9 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } } - // 第二轮:Team 有 4 次额外补救机会 + // 第二轮:Team 有 4 次额外补救机会(如果 Team 未满) teamRetries := 4 - for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running; retry++ { + for retry := 0; retry < teamRetries && len(failedSlots) > 0 && teamProcessState.Running && !isTeamExhausted(); retry++ { slotIdx := failedSlots[0] logger.Warning(fmt.Sprintf("%s [补救 %d/%d] 尝试补充成员 %d", logPrefix, retry+1, teamRetries, slotIdx+1), owner.Email, "team") @@ -462,6 +531,13 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } } + // 补救后再次检查 Team 是否已满 + if isTeamExhausted() { + result.Errors = append(result.Errors, "Team 邀请已满") + result.DurationMs = time.Since(startTime).Milliseconds() + return result + } + // 统计注册成功数 registeredChildren := make([]MemberAccount, 0) for _, c := range children { @@ -477,59 +553,88 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { } logger.Info(fmt.Sprintf("%s 注册完成: %d/%d 成功", logPrefix, result.Registered, req.MembersPerTeam), owner.Email, "team") - // Step 4: S2A 授权入库(成员) + // 如果没有任何成员注册成功,跳过入库步骤 + if len(registeredChildren) == 0 { + logger.Warning(fmt.Sprintf("%s 没有成员注册成功,跳过入库步骤", logPrefix), owner.Email, "team") + result.DurationMs = time.Since(startTime).Milliseconds() + markOwnerResult(false) + return result + } + + // Step 4: S2A 授权入库(成员)- 带重试 for i, child := range registeredChildren { if !teamProcessState.Running { break } - s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) - if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Member %d auth URL: %v", i+1, err)) - continue + var s2aSuccess bool + for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 + if attempt > 0 { + logger.Warning(fmt.Sprintf("%s [成员 %d] 入库重试...", logPrefix, i+1), child.Email, "team") + } + + s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) + if err != nil { + logger.Error(fmt.Sprintf("%s [成员 %d] 获取授权URL失败: %v", logPrefix, i+1, err), child.Email, "team") + continue + } + + // 根据配置选择浏览器自动化 + var code string + if req.BrowserType == "rod" { + code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) + } else { + code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) + } + if err != nil { + logger.Error(fmt.Sprintf("%s [成员 %d] 浏览器授权失败: %v", logPrefix, i+1, err), child.Email, "team") + continue + } + + // 提交到 S2A + _, err = auth.SubmitS2AOAuth( + config.Global.S2AApiBase, + config.Global.S2AAdminKey, + s2aResp.Data.SessionID, + code, + child.Email, + config.Global.Concurrency, + config.Global.Priority, + config.Global.GroupIDs, + config.Global.ProxyID, + ) + if err != nil { + logger.Error(fmt.Sprintf("%s [成员 %d] S2A提交失败: %v", logPrefix, i+1, err), child.Email, "team") + continue + } + + s2aSuccess = true + result.AddedToS2A++ + logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team") + break } - // 根据配置选择浏览器自动化 - var code string - if req.BrowserType == "rod" { - code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) - } else { - code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, child.Email, child.Password, teamID, req.Headless, req.Proxy) + if !s2aSuccess { + result.Errors = append(result.Errors, fmt.Sprintf("成员 %d 入库失败", i+1)) } - if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Member %d browser: %v", i+1, err)) - continue - } - - // 提交到 S2A - _, err = auth.SubmitS2AOAuth( - config.Global.S2AApiBase, - config.Global.S2AAdminKey, - s2aResp.Data.SessionID, - code, - child.Email, - config.Global.Concurrency, - config.Global.Priority, - config.Global.GroupIDs, - config.Global.ProxyID, - ) - if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Member %d S2A: %v", i+1, err)) - continue - } - - result.AddedToS2A++ - logger.Success(fmt.Sprintf("%s [成员 %d] ✓ 入库成功", logPrefix, i+1), child.Email, "team") } - // Step 5: 母号也入库(如果开启) + // Step 5: 母号也入库(如果开启)- 带重试 if req.IncludeOwner && teamProcessState.Running { logger.Info(fmt.Sprintf("%s 开始将母号入库到 S2A", logPrefix), owner.Email, "team") - s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) - if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Owner auth URL: %v", err)) - } else { + var ownerSuccess bool + for attempt := 0; attempt < 2; attempt++ { // 最多重试1次 + if attempt > 0 { + logger.Warning(fmt.Sprintf("%s [母号] 入库重试...", logPrefix), owner.Email, "team") + } + + s2aResp, err := auth.GenerateS2AAuthURL(config.Global.S2AApiBase, config.Global.S2AAdminKey, config.Global.ProxyID) + if err != nil { + logger.Error(fmt.Sprintf("%s [母号] 获取授权URL失败: %v", logPrefix, err), owner.Email, "team") + continue + } + var code string if req.BrowserType == "rod" { code, err = auth.CompleteWithRod(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) @@ -537,26 +642,34 @@ func processSingleTeam(idx int, req TeamProcessRequest) TeamProcessResult { code, err = auth.CompleteWithChromedp(s2aResp.Data.AuthURL, owner.Email, owner.Password, teamID, req.Headless, req.Proxy) } if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Owner browser: %v", err)) - } else { - _, err = auth.SubmitS2AOAuth( - config.Global.S2AApiBase, - config.Global.S2AAdminKey, - s2aResp.Data.SessionID, - code, - owner.Email, - config.Global.Concurrency, - config.Global.Priority, - config.Global.GroupIDs, - config.Global.ProxyID, - ) - if err != nil { - result.Errors = append(result.Errors, fmt.Sprintf("Owner S2A: %v", err)) - } else { - result.AddedToS2A++ - logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team") - } + logger.Error(fmt.Sprintf("%s [母号] 浏览器授权失败: %v", logPrefix, err), owner.Email, "team") + continue } + + _, err = auth.SubmitS2AOAuth( + config.Global.S2AApiBase, + config.Global.S2AAdminKey, + s2aResp.Data.SessionID, + code, + owner.Email, + config.Global.Concurrency, + config.Global.Priority, + config.Global.GroupIDs, + config.Global.ProxyID, + ) + if err != nil { + logger.Error(fmt.Sprintf("%s [母号] S2A提交失败: %v", logPrefix, err), owner.Email, "team") + continue + } + + ownerSuccess = true + result.AddedToS2A++ + logger.Success(fmt.Sprintf("%s [母号 ] ✓ 入库成功", logPrefix), owner.Email, "team") + break + } + + if !ownerSuccess { + result.Errors = append(result.Errors, "母号入库失败") } } diff --git a/frontend/src/components/LiveLogViewer.tsx b/frontend/src/components/LiveLogViewer.tsx index f999d58..4fa0a07 100644 --- a/frontend/src/components/LiveLogViewer.tsx +++ b/frontend/src/components/LiveLogViewer.tsx @@ -1,4 +1,4 @@ -import { useState, useEffect, useRef, useCallback } from 'react' +import { useState, useEffect, useRef } from 'react' import { Terminal, Play, Pause, Trash2, ChevronDown } from 'lucide-react' interface LogEntry { @@ -43,11 +43,34 @@ export default function LiveLogViewer({ const eventSourceRef = useRef(null) const pausedLogsRef = useRef([]) - // 连接 SSE - const connect = useCallback(() => { - if (eventSourceRef.current) { - eventSourceRef.current.close() + const isPausedRef = useRef(isPaused) + isPausedRef.current = isPaused + + // 加载历史日志 + const loadHistoryLogs = async () => { + try { + const res = await fetch('/api/logs') + const data = await res.json() + if (data.code === 0 && Array.isArray(data.data)) { + // 转换为 LogEntry 格式 + const historyLogs: LogEntry[] = data.data.map((log: { level: string; message: string; timestamp: string; email?: string; module?: string }) => ({ + type: 'log', + level: log.level || 'info', + message: log.message || '', + timestamp: log.timestamp ? new Date(log.timestamp).toLocaleTimeString('zh-CN') : '', + email: log.email, + module: log.module || 'system', + })) + setLogs(historyLogs.slice(-maxLogs)) + } + } catch (e) { + console.error('Failed to load history logs:', e) } + } + + // 组件挂载时先加载历史日志,再连接 SSE + useEffect(() => { + loadHistoryLogs() const es = new EventSource('/api/logs/stream') eventSourceRef.current = es @@ -61,12 +84,11 @@ export default function LiveLogViewer({ const data = JSON.parse(event.data) as LogEntry if (data.type === 'connected') { - console.log('SSE connected:', data) return } if (data.type === 'log') { - if (isPaused) { + if (isPausedRef.current) { pausedLogsRef.current.push(data) } else { setLogs((prev) => { @@ -82,20 +104,13 @@ export default function LiveLogViewer({ es.onerror = () => { setIsConnected(false) - // 自动重连 - setTimeout(connect, 3000) } - }, [isPaused, maxLogs]) - // 组件挂载时连接 - useEffect(() => { - connect() return () => { - if (eventSourceRef.current) { - eventSourceRef.current.close() - } + es.close() } - }, [connect]) + // eslint-disable-next-line react-hooks/exhaustive-deps + }, []) // 自动滚动 useEffect(() => { diff --git a/frontend/src/context/ConfigContext.tsx b/frontend/src/context/ConfigContext.tsx index f393d8f..966fe43 100644 --- a/frontend/src/context/ConfigContext.tsx +++ b/frontend/src/context/ConfigContext.tsx @@ -61,6 +61,11 @@ export function ConfigProvider({ children }: { children: ReactNode }) { refreshConfig() }, [refreshConfig]) + // 当站点名称变化时,更新浏览器标签页标题 + useEffect(() => { + document.title = siteName + }, [siteName]) + // Update S2A client when config changes and auto-test connection useEffect(() => { if (config.s2a.apiBase && config.s2a.adminKey) { diff --git a/frontend/src/pages/Cleaner.tsx b/frontend/src/pages/Cleaner.tsx index e58afcf..5410c4b 100644 --- a/frontend/src/pages/Cleaner.tsx +++ b/frontend/src/pages/Cleaner.tsx @@ -108,6 +108,30 @@ export default function Cleaner() { } } + // 计算下次清理时间 + const getNextCleanTime = (): string => { + if (!cleanEnabled) return '未启用' + if (!status?.last_clean_time || status.last_clean_time === '0001-01-01T00:00:00Z') { + return '即将执行' + } + try { + const lastTime = new Date(status.last_clean_time) + const nextTime = new Date(lastTime.getTime() + cleanInterval * 1000) + // 如果下次时间已过,说明即将执行 + if (nextTime <= new Date()) { + return '即将执行' + } + return nextTime.toLocaleString('zh-CN', { + month: '2-digit', + day: '2-digit', + hour: '2-digit', + minute: '2-digit' + }) + } catch { + return '未知' + } + } + if (loading) { return (
@@ -193,6 +217,9 @@ export default function Cleaner() {

{formatInterval(cleanInterval)}

+

+ 下次清理: {getNextCleanTime()} +