diff --git a/backend/internal/api/error_cleaner.go b/backend/internal/api/error_cleaner.go index d02fadb..0bd2704 100644 --- a/backend/internal/api/error_cleaner.go +++ b/backend/internal/api/error_cleaner.go @@ -1,11 +1,15 @@ package api import ( + "encoding/json" "fmt" + "io" + "net/http" "strconv" "sync" "time" + "codex-pool/internal/config" "codex-pool/internal/database" "codex-pool/internal/logger" ) @@ -64,6 +68,128 @@ func StopErrorCleanerService() { } } +// getTotalAccountCount 获取 S2A 总账号数 +func getTotalAccountCount() (int, error) { + if config.Global == nil || config.Global.S2AApiBase == "" { + return 0, fmt.Errorf("S2A 配置未设置") + } + + client := &http.Client{Timeout: 30 * time.Second} + url := fmt.Sprintf("%s/api/v1/admin/dashboard", config.Global.S2AApiBase) + + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return 0, err + } + setS2AHeaders(req) + + resp, err := client.Do(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + body, _ := io.ReadAll(resp.Body) + + var result struct { + Code int `json:"code"` + Data struct { + TotalAccounts int `json:"total_accounts"` + } `json:"data"` + } + if err := json.Unmarshal(body, &result); err != nil { + return 0, err + } + + return result.Data.TotalAccounts, nil +} + +// triggerAutoRefill 触发自动补号 +func triggerAutoRefill(count int) { + if database.Instance == nil { + return + } + + // 检查是否有待处理的母号 + pendingOwners, err := database.Instance.GetPendingOwners() + if err != nil || len(pendingOwners) == 0 { + logger.Warning("自动补号跳过: 无可用母号", "", "cleaner") + return + } + + // 每个母号产生 4 个账号(默认),计算需要多少母号 + membersPerTeam := 4 + if val, _ := database.Instance.GetConfig("monitor_members_per_team"); val != "" { + if v, err := strconv.Atoi(val); err == nil && v > 0 { + membersPerTeam = v + } + } + + // 计算需要处理的母号数量 + needOwners := (count + membersPerTeam - 1) / membersPerTeam + if needOwners > len(pendingOwners) { + needOwners = len(pendingOwners) + } + if needOwners < 1 { + needOwners = 1 + } + + // 读取补号配置 + concurrentTeams := 2 + if val, _ := database.Instance.GetConfig("monitor_concurrent_teams"); val != "" { + if v, err := strconv.Atoi(val); err == nil && v > 0 { + concurrentTeams = v + } + } + s2aConcurrency := 2 + if val, _ := database.Instance.GetConfig("monitor_s2a_concurrency"); val != "" { + if v, err := strconv.Atoi(val); err == nil && v > 0 { + s2aConcurrency = v + } + } + browserType := "chromedp" + if val, _ := database.Instance.GetConfig("monitor_browser_type"); val != "" { + browserType = val + } + useProxy := false + if val, _ := database.Instance.GetConfig("monitor_replenish_use_proxy"); val == "true" { + useProxy = true + } + + proxy := "" + if useProxy { + proxy = config.Global.GetProxy() + } + + logger.Info(fmt.Sprintf("自动补号启动: 处理 %d 个母号,预计补充 %d 个账号", needOwners, needOwners*membersPerTeam), "", "cleaner") + + // 构建请求 + owners := make([]TeamOwner, 0, needOwners) + for i := 0; i < needOwners && i < len(pendingOwners); i++ { + owners = append(owners, TeamOwner{ + Email: pendingOwners[i].Email, + Password: pendingOwners[i].Password, + Token: pendingOwners[i].Token, + AccountID: pendingOwners[i].AccountID, + }) + } + + req := TeamProcessRequest{ + Owners: owners, + MembersPerTeam: membersPerTeam, + ConcurrentTeams: concurrentTeams, + ConcurrentS2A: s2aConcurrency, + BrowserType: browserType, + Headless: true, + Proxy: proxy, + IncludeOwner: false, + ProcessCount: needOwners, + } + + // 异步启动处理(避免阻塞清理服务) + go runTeamProcess(req) +} + // checkAndCleanErrors 检查配置并清理错误账号 func checkAndCleanErrors() { if database.Instance == nil { @@ -79,6 +205,13 @@ func checkAndCleanErrors() { return } + // 获取清理前的总账号数 + totalBefore, err := getTotalAccountCount() + if err != nil { + logger.Warning(fmt.Sprintf("获取总账号数失败: %v", err), "", "cleaner") + totalBefore = 0 + } + // 获取错误账号列表 errorAccounts, err := fetchAllErrorAccounts() if err != nil { @@ -110,6 +243,34 @@ func checkAndCleanErrors() { lastCleanTime = time.Now() logger.Success(fmt.Sprintf("定期清理错误账号完成: 成功=%d, 失败=%d, 总数=%d", success, failed, len(errorAccounts)), "", "cleaner") + + // 检查是否需要触发自动补号 + autoRefillEnabled := false + if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" { + autoRefillEnabled = true + } + + if autoRefillEnabled && totalBefore > 0 && success > 0 { + // 阈值默认 25% + threshold := 25 + if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" { + if v, err := strconv.Atoi(val); err == nil && v > 0 && v <= 100 { + threshold = v + } + } + + // 计算是否达到阈值 + thresholdCount := totalBefore * threshold / 100 + if thresholdCount < 1 { + thresholdCount = 1 + } + + if success >= thresholdCount { + logger.Info(fmt.Sprintf("清理触发自动补号: 已删除 %d 个 (总数 %d,阈值 %d%%),将补充 %d 个账号", + success, totalBefore, threshold, success), "", "cleaner") + triggerAutoRefill(success) + } + } } // GetCleanerStatus 获取清理服务状态 @@ -119,6 +280,9 @@ func GetCleanerStatus() map[string]interface{} { enabled := false interval := 3600 + autoRefill := false + autoRefillThreshold := 25 + if database.Instance != nil { if val, _ := database.Instance.GetConfig("error_clean_enabled"); val == "true" { enabled = true @@ -128,12 +292,22 @@ func GetCleanerStatus() map[string]interface{} { interval = v } } + if val, _ := database.Instance.GetConfig("auto_refill_on_cleanup"); val == "true" { + autoRefill = true + } + if val, _ := database.Instance.GetConfig("auto_refill_threshold"); val != "" { + if v, err := strconv.Atoi(val); err == nil { + autoRefillThreshold = v + } + } } return map[string]interface{}{ - "running": cleanerRunning, - "enabled": enabled, - "interval": interval, - "last_clean_time": lastCleanTime.Format(time.RFC3339), + "running": cleanerRunning, + "enabled": enabled, + "interval": interval, + "last_clean_time": lastCleanTime.Format(time.RFC3339), + "auto_refill": autoRefill, + "auto_refill_threshold": autoRefillThreshold, } } diff --git a/backend/internal/database/sqlite.go b/backend/internal/database/sqlite.go index e32de35..1772697 100644 --- a/backend/internal/database/sqlite.go +++ b/backend/internal/database/sqlite.go @@ -147,6 +147,20 @@ func (d *DB) createTables() error { ); CREATE INDEX IF NOT EXISTS idx_codex_auth_proxies_enabled ON codex_auth_proxies(is_enabled); + + -- 日志持久化表 + CREATE TABLE IF NOT EXISTS app_logs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp DATETIME NOT NULL, + level TEXT NOT NULL, + message TEXT NOT NULL, + email TEXT, + module TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_app_logs_timestamp ON app_logs(timestamp); + CREATE INDEX IF NOT EXISTS idx_app_logs_module ON app_logs(module); + CREATE INDEX IF NOT EXISTS idx_app_logs_level ON app_logs(level); `) return err } @@ -1008,6 +1022,186 @@ func (d *DB) GetCodexProxyStats() map[string]int { return stats } +// ======================== +// 日志持久化相关方法 +// ======================== + +// LogEntry 日志条目 +type LogEntry struct { + ID int64 `json:"id"` + Timestamp time.Time `json:"timestamp"` + Level string `json:"level"` + Message string `json:"message"` + Email string `json:"email,omitempty"` + Module string `json:"module,omitempty"` +} + +// InsertLog 插入日志 +func (d *DB) InsertLog(timestamp time.Time, level, message, email, module string) error { + _, err := d.db.Exec(` + INSERT INTO app_logs (timestamp, level, message, email, module) + VALUES (?, ?, ?, ?, ?) + `, timestamp, level, message, email, module) + return err +} + +// GetLogs 获取最近的日志 +func (d *DB) GetLogs(limit int) ([]LogEntry, error) { + if limit <= 0 { + limit = 100 + } + if limit > 1000 { + limit = 1000 + } + + rows, err := d.db.Query(` + SELECT id, timestamp, level, message, COALESCE(email, ''), module + FROM app_logs + ORDER BY timestamp DESC + LIMIT ? + `, limit) + if err != nil { + return nil, err + } + defer rows.Close() + + var logs []LogEntry + for rows.Next() { + var log LogEntry + if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil { + continue + } + logs = append(logs, log) + } + + // 反转顺序(让最新的在后面,符合日志显示习惯) + for i, j := 0, len(logs)-1; i < j; i, j = i+1, j-1 { + logs[i], logs[j] = logs[j], logs[i] + } + + return logs, nil +} + +// GetLogsByModule 按模块获取日志 +func (d *DB) GetLogsByModule(module string, page, pageSize int) ([]LogEntry, int, error) { + if page < 1 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + + // 获取总数 + var total int + d.db.QueryRow("SELECT COUNT(*) FROM app_logs WHERE module = ?", module).Scan(&total) + + offset := (page - 1) * pageSize + rows, err := d.db.Query(` + SELECT id, timestamp, level, message, COALESCE(email, ''), module + FROM app_logs + WHERE module = ? + ORDER BY timestamp DESC + LIMIT ? OFFSET ? + `, module, pageSize, offset) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + var logs []LogEntry + for rows.Next() { + var log LogEntry + if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil { + continue + } + logs = append(logs, log) + } + + return logs, total, nil +} + +// GetLogsByModuleAndLevel 按模块和级别获取日志 +func (d *DB) GetLogsByModuleAndLevel(module, level string, page, pageSize int) ([]LogEntry, int, error) { + if page < 1 { + page = 1 + } + if pageSize <= 0 { + pageSize = 20 + } + + // 构建查询 + countQuery := "SELECT COUNT(*) FROM app_logs WHERE module = ?" + selectQuery := ` + SELECT id, timestamp, level, message, COALESCE(email, ''), module + FROM app_logs + WHERE module = ?` + args := []interface{}{module} + + if level != "" { + countQuery += " AND level = ?" + selectQuery += " AND level = ?" + args = append(args, level) + } + + // 获取总数 + var total int + d.db.QueryRow(countQuery, args...).Scan(&total) + + offset := (page - 1) * pageSize + selectQuery += " ORDER BY timestamp DESC LIMIT ? OFFSET ?" + args = append(args, pageSize, offset) + + rows, err := d.db.Query(selectQuery, args...) + if err != nil { + return nil, 0, err + } + defer rows.Close() + + var logs []LogEntry + for rows.Next() { + var log LogEntry + if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil { + continue + } + logs = append(logs, log) + } + + return logs, total, nil +} + +// ClearLogs 清空所有日志 +func (d *DB) ClearLogs() error { + _, err := d.db.Exec("DELETE FROM app_logs") + return err +} + +// ClearLogsByModule 按模块清空日志 +func (d *DB) ClearLogsByModule(module string) (int64, error) { + result, err := d.db.Exec("DELETE FROM app_logs WHERE module = ?", module) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +// CleanupOldLogs 清理旧日志(保留最近 N 条) +func (d *DB) CleanupOldLogs(keepCount int) (int64, error) { + if keepCount <= 0 { + keepCount = 5000 + } + + result, err := d.db.Exec(` + DELETE FROM app_logs + WHERE id NOT IN ( + SELECT id FROM app_logs ORDER BY timestamp DESC LIMIT ? + ) + `, keepCount) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + // Close 关闭数据库 func (d *DB) Close() error { if d.db != nil { diff --git a/backend/internal/logger/logger.go b/backend/internal/logger/logger.go index 7295c98..01d3cd1 100644 --- a/backend/internal/logger/logger.go +++ b/backend/internal/logger/logger.go @@ -6,6 +6,8 @@ import ( "strings" "sync" "time" + + "codex-pool/internal/database" ) // LogEntry 日志条目 @@ -17,7 +19,7 @@ type LogEntry struct { Module string `json:"module,omitempty"` } -// 日志存储 +// 日志存储(内存缓存 + 数据库持久化) var ( logs = make([]LogEntry, 0, 1000) logsMu sync.RWMutex @@ -80,6 +82,7 @@ func log(level, message, email, module string) { Module: module, } + // 1. 内存缓存 logsMu.Lock() if len(logs) >= 1000 { logs = logs[100:] @@ -87,8 +90,16 @@ func log(level, message, email, module string) { logs = append(logs, entry) logsMu.Unlock() + // 2. 广播给监听器 broadcast(entry) + // 3. 持久化到数据库(异步,避免阻塞) + go func() { + if database.Instance != nil { + database.Instance.InsertLog(entry.Timestamp, level, message, email, module) + } + }() + // 打印到控制台 (带时间戳和颜色) timestamp := entry.Timestamp.Format("15:04:05") @@ -259,11 +270,16 @@ func GetLogsByModule(module string, page, pageSize int) ([]LogEntry, int) { return filtered[start:end], total } -// ClearLogs 清空日志 +// ClearLogs 清空日志(内存 + 数据库) func ClearLogs() { logsMu.Lock() defer logsMu.Unlock() logs = make([]LogEntry, 0, 1000) + + // 同时清空数据库 + if database.Instance != nil { + database.Instance.ClearLogs() + } } // GetLogsByModuleAndLevel 按模块和级别筛选日志并分页(最新的在前) @@ -303,12 +319,12 @@ func GetLogsByModuleAndLevel(module, level string, page, pageSize int) ([]LogEnt return filtered[start:end], total } -// ClearLogsByModule 按模块清除日志 +// ClearLogsByModule 按模块清除日志(内存 + 数据库) func ClearLogsByModule(module string) int { logsMu.Lock() defer logsMu.Unlock() - // 过滤掉指定模块的日志 + // 过滤掉指定模块的日志(内存) var newLogs []LogEntry cleared := 0 for _, log := range logs { @@ -319,5 +335,14 @@ func ClearLogsByModule(module string) int { } } logs = newLogs + + // 同时清空数据库 + if database.Instance != nil { + dbCleared, _ := database.Instance.ClearLogsByModule(module) + if dbCleared > int64(cleared) { + cleared = int(dbCleared) + } + } + return cleared } diff --git a/chatgpt-owner-demote b/chatgpt-owner-demote new file mode 160000 index 0000000..861cecb --- /dev/null +++ b/chatgpt-owner-demote @@ -0,0 +1 @@ +Subproject commit 861cecbf4204fabecfd8f6848d67fc729a23836b