feat: Initialize core application structure with backend configuration, database, API, and a comprehensive frontend UI for account pooling and management.

This commit is contained in:
2026-01-31 01:48:07 +08:00
parent 74bdcae836
commit 92383f2f20
14 changed files with 776 additions and 47 deletions

View File

@@ -0,0 +1,141 @@
package api
import (
"fmt"
"strconv"
"sync"
"time"
"codex-pool/internal/database"
"codex-pool/internal/logger"
)
var (
cleanerRunning bool
cleanerMu sync.Mutex
cleanerStopChan chan struct{}
lastCleanTime time.Time
)
// StartErrorCleanerService 启动定期清理错误账号服务
func StartErrorCleanerService() {
cleanerMu.Lock()
if cleanerRunning {
cleanerMu.Unlock()
return
}
cleanerRunning = true
cleanerStopChan = make(chan struct{})
cleanerMu.Unlock()
logger.Info("错误账号清理服务已启动(需在配置中启用)", "", "cleaner")
go func() {
for {
// 读取清理间隔配置 (默认 3600 秒 = 1 小时)
cleanInterval := 3600
if database.Instance != nil {
if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil && v >= 60 {
cleanInterval = v
}
}
}
select {
case <-cleanerStopChan:
logger.Info("错误账号清理服务已停止", "", "cleaner")
return
case <-time.After(time.Duration(cleanInterval) * time.Second):
checkAndCleanErrors()
}
}
}()
}
// StopErrorCleanerService 停止错误账号清理服务
func StopErrorCleanerService() {
cleanerMu.Lock()
defer cleanerMu.Unlock()
if cleanerRunning && cleanerStopChan != nil {
close(cleanerStopChan)
cleanerRunning = false
}
}
// checkAndCleanErrors 检查配置并清理错误账号
func checkAndCleanErrors() {
if database.Instance == nil {
return
}
// 检查是否启用了自动清理
enabled := false
if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" {
enabled = true
}
if !enabled {
return
}
// 执行清理
logger.Info("开始定期清理错误账号...", "", "cleaner")
errorAccounts, err := fetchAllErrorAccounts()
if err != nil {
logger.Error(fmt.Sprintf("获取错误账号列表失败: %v", err), "", "cleaner")
return
}
if len(errorAccounts) == 0 {
logger.Info("没有错误账号需要清理", "", "cleaner")
lastCleanTime = time.Now()
return
}
logger.Info(fmt.Sprintf("找到 %d 个错误账号,开始删除...", len(errorAccounts)), "", "cleaner")
success := 0
failed := 0
for _, account := range errorAccounts {
err := deleteS2AAccount(account.ID)
if err != nil {
failed++
logger.Warning(fmt.Sprintf("删除账号失败: ID=%d, Email=%s, Error=%v", account.ID, account.Email, err), account.Email, "cleaner")
} else {
success++
logger.Success(fmt.Sprintf("删除账号成功: ID=%d, Email=%s", account.ID, account.Email), account.Email, "cleaner")
}
}
lastCleanTime = time.Now()
logger.Success(fmt.Sprintf("定期清理错误账号完成: 成功=%d, 失败=%d, 总数=%d", success, failed, len(errorAccounts)), "", "cleaner")
}
// GetCleanerStatus 获取清理服务状态
func GetCleanerStatus() map[string]interface{} {
cleanerMu.Lock()
defer cleanerMu.Unlock()
enabled := false
interval := 3600
if database.Instance != nil {
if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" {
enabled = true
}
if val, _ := database.Instance.GetConfig("error_clean_interval"); val != "" {
if v, err := strconv.Atoi(val); err == nil {
interval = v
}
}
}
return map[string]interface{}{
"running": cleanerRunning,
"enabled": enabled,
"interval": interval,
"last_clean_time": lastCleanTime.Format(time.RFC3339),
}
}

View File

@@ -179,10 +179,6 @@ func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
func runTeamProcess(req TeamProcessRequest) {
defer func() {
teamProcessState.Running = false
}()
totalOwners := len(req.Owners)
workerCount := req.ConcurrentTeams // 同时运行的 worker 数量
if workerCount > totalOwners {
@@ -202,6 +198,30 @@ func runTeamProcess(req TeamProcessRequest) {
}
}
// 统计变量(在 defer 中使用)
var totalRegistered, totalAddedToS2A int
var allErrors []string
// 确保任务结束时更新状态和批次记录
defer func() {
teamProcessState.Running = false
// 无论任务是正常完成还是异常中断,都更新批次记录状态
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
if err := database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr); err != nil {
logger.Error(fmt.Sprintf("更新批次记录失败: %v", err), "", "team")
}
}
}()
logger.Info(fmt.Sprintf("开始批量处理: 共 %d 个 Team, 并发数: %d", totalOwners, workerCount), "", "team")
// 任务队列
@@ -243,10 +263,7 @@ func runTeamProcess(req TeamProcessRequest) {
close(resultChan)
}()
// 统计总数
var totalRegistered, totalAddedToS2A int
var allErrors []string
// 收集结果并统计
for result := range resultChan {
teamProcessState.mu.Lock()
teamProcessState.Results = append(teamProcessState.Results, result)
@@ -257,19 +274,6 @@ func runTeamProcess(req TeamProcessRequest) {
allErrors = append(allErrors, result.Errors...)
}
// 更新批次记录
if database.Instance != nil && batchID > 0 {
errorsStr := ""
if len(allErrors) > 0 {
// 只保留前10条错误
if len(allErrors) > 10 {
allErrors = allErrors[:10]
}
errorsStr = fmt.Sprintf("%v", allErrors)
}
database.Instance.UpdateBatchRun(batchID, totalRegistered, totalAddedToS2A, errorsStr)
}
// 计算成功率
expectedTotal := totalOwners * req.MembersPerTeam
successRate := float64(0)

View File

@@ -42,6 +42,9 @@ type Config struct {
AutoPauseOnExpired bool `json:"auto_pause_on_expired"`
AccountsPath string `json:"accounts_path"`
// 站点配置
SiteName string `json:"site_name"`
// 邮箱服务
MailServices []MailServiceConfig `json:"mail_services"`
}

View File

@@ -322,8 +322,9 @@ func (d *DB) GetOwnerStats() map[string]int {
stats := map[string]int{
"total": 0,
"valid": 0,
"registered": 0,
"pooled": 0,
"processing": 0,
"used": 0,
"invalid": 0,
}
var count int
@@ -333,11 +334,14 @@ func (d *DB) GetOwnerStats() map[string]int {
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'valid'").Scan(&count); err == nil {
stats["valid"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'registered'").Scan(&count); err == nil {
stats["registered"] = count
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'processing'").Scan(&count); err == nil {
stats["processing"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'pooled'").Scan(&count); err == nil {
stats["pooled"] = count
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'used'").Scan(&count); err == nil {
stats["used"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'invalid'").Scan(&count); err == nil {
stats["invalid"] = count
}
return stats
@@ -439,6 +443,21 @@ func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) {
return runs, nil
}
// CleanupStuckBatchRuns 清理卡住的 running 状态批次记录
func (d *DB) CleanupStuckBatchRuns() (int64, error) {
result, err := d.db.Exec(`
UPDATE batch_runs
SET status = 'completed',
finished_at = COALESCE(finished_at, started_at),
duration_seconds = 0
WHERE status = 'running'
`)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// GetBatchRunStats 获取批次统计
func (d *DB) GetBatchRunStats() map[string]interface{} {
stats := make(map[string]interface{})