Files
codexautopool/backend/internal/database/sqlite.go
kyx236 02ddb48f21 fix(database): Recalculate batch run metrics from team results on cleanup
- Aggregate total_registered and total_added_to_s2a from batch_team_results table
- Calculate success_rate based on actual S2A additions vs total owners
- Compute duration_seconds from started_at and finished_at timestamps
- Improve stuck batch run recovery by restoring accurate metrics instead of zeroing them out
2026-02-07 02:52:06 +08:00

1263 lines
35 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package database
import (
"database/sql"
"encoding/json"
"fmt"
"strconv"
"strings"
"time"
_ "github.com/mattn/go-sqlite3"
)
// TeamOwner 账号结构
type TeamOwner struct {
ID int64 `json:"id"`
Email string `json:"email"`
Password string `json:"password,omitempty"`
Token string `json:"token,omitempty"`
AccountID string `json:"account_id"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
}
// DB 数据库管理器
type DB struct {
db *sql.DB
}
// 全局数据库实例
var Instance *DB
// Init 初始化数据库
func Init(dbPath string) error {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("打开数据库失败: %w", err)
}
Instance = &DB{db: db}
if err := Instance.createTables(); err != nil {
return fmt.Errorf("创建表失败: %w", err)
}
// 执行数据库迁移
if err := Instance.migrate(); err != nil {
fmt.Printf("[数据库] 迁移警告: %v\n", err)
}
fmt.Printf("[数据库] SQLite 已连接: %s\n", dbPath)
return nil
}
// migrate 数据库迁移
func (d *DB) migrate() error {
// 添加 last_checked_at 列(如果不存在)
_, _ = d.db.Exec(`ALTER TABLE team_owners ADD COLUMN last_checked_at DATETIME`)
// 添加 last_test_at 列 (如果不存在)
_, _ = d.db.Exec(`ALTER TABLE codex_auth_proxies ADD COLUMN last_test_at DATETIME`)
// 添加 location 列 (如果不存在)
_, _ = d.db.Exec(`ALTER TABLE codex_auth_proxies ADD COLUMN location TEXT`)
return nil
}
// isColumnExistsError 判断是否是列已存在的错误
func isColumnExistsError(err error) bool {
return err != nil && (err.Error() == "duplicate column name: last_checked_at" ||
err.Error() == "table team_owners already has a column named last_checked_at")
}
// createTables 创建表
func (d *DB) createTables() error {
_, err := d.db.Exec(`
CREATE TABLE IF NOT EXISTS team_owners (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password TEXT,
token TEXT,
account_id TEXT NOT NULL,
status TEXT DEFAULT 'valid',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_checked_at DATETIME
);
CREATE INDEX IF NOT EXISTS idx_team_owners_email ON team_owners(email);
CREATE INDEX IF NOT EXISTS idx_team_owners_status ON team_owners(status);
CREATE INDEX IF NOT EXISTS idx_team_owners_account_id ON team_owners(account_id);
-- 配置表 (key-value 形式)
CREATE TABLE IF NOT EXISTS app_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 批次运行记录表
CREATE TABLE IF NOT EXISTS batch_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
finished_at DATETIME,
total_owners INTEGER DEFAULT 0,
total_registered INTEGER DEFAULT 0,
total_added_to_s2a INTEGER DEFAULT 0,
success_rate REAL DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
errors TEXT
);
CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at);
-- 批次Team处理结果表
CREATE TABLE IF NOT EXISTS batch_team_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
batch_id INTEGER NOT NULL,
team_index INTEGER NOT NULL,
owner_email TEXT NOT NULL,
team_id TEXT,
registered INTEGER DEFAULT 0,
added_to_s2a INTEGER DEFAULT 0,
member_emails TEXT,
errors TEXT,
duration_ms INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (batch_id) REFERENCES batch_runs(id)
);
CREATE INDEX IF NOT EXISTS idx_batch_team_results_batch_id ON batch_team_results(batch_id);
-- CodexAuth 代理池表
CREATE TABLE IF NOT EXISTS codex_auth_proxies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
proxy_url TEXT NOT NULL UNIQUE,
description TEXT,
is_enabled INTEGER DEFAULT 1,
last_used_at DATETIME,
success_count INTEGER DEFAULT 0,
fail_count INTEGER DEFAULT 0,
location TEXT,
last_test_at DATETIME,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_codex_auth_proxies_enabled ON codex_auth_proxies(is_enabled);
-- 日志持久化表
CREATE TABLE IF NOT EXISTS app_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
timestamp DATETIME NOT NULL,
level TEXT NOT NULL,
message TEXT NOT NULL,
email TEXT,
module TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_app_logs_timestamp ON app_logs(timestamp);
CREATE INDEX IF NOT EXISTS idx_app_logs_module ON app_logs(module);
CREATE INDEX IF NOT EXISTS idx_app_logs_level ON app_logs(level);
`)
return err
}
// GetConfig 获取配置值
func (d *DB) GetConfig(key string) (string, error) {
var value string
err := d.db.QueryRow("SELECT value FROM app_config WHERE key = ?", key).Scan(&value)
if err == sql.ErrNoRows {
return "", nil
}
return value, err
}
// SetConfig 设置配置值
func (d *DB) SetConfig(key, value string) error {
_, err := d.db.Exec(`
INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP
`, key, value, value)
return err
}
// GetAllConfig 获取所有配置
func (d *DB) GetAllConfig() (map[string]string, error) {
rows, err := d.db.Query("SELECT key, value FROM app_config")
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]string)
for rows.Next() {
var key, value string
if err := rows.Scan(&key, &value); err != nil {
continue
}
result[key] = value
}
return result, nil
}
// AddTeamOwner 添加 Team Owner
func (d *DB) AddTeamOwner(owner TeamOwner) (int64, error) {
result, err := d.db.Exec(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`, owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// AddTeamOwners 批量添加
func (d *DB) AddTeamOwners(owners []TeamOwner) (int, error) {
tx, err := d.db.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`)
if err != nil {
return 0, err
}
defer stmt.Close()
count := 0
for _, owner := range owners {
_, err := stmt.Exec(owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
fmt.Printf("[数据库] 插入失败 %s: %v\n", owner.Email, err)
continue
}
count++
}
if err := tx.Commit(); err != nil {
return 0, err
}
return count, nil
}
// GetTeamOwners 获取列表
func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int, error) {
query := "SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE 1=1"
countQuery := "SELECT COUNT(*) FROM team_owners WHERE 1=1"
args := []interface{}{}
if status != "" {
query += " AND status = ?"
countQuery += " AND status = ?"
args = append(args, status)
}
var total int
err := d.db.QueryRow(countQuery, args...).Scan(&total)
if err != nil {
return nil, 0, err
}
// 排序:已使用(used, pooled)排前面,其次按创建时间降序
query += " ORDER BY CASE WHEN status IN ('used', 'pooled') THEN 0 ELSE 1 END, created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := d.db.Query(query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, total, nil
}
// GetPendingOwners 获取待处理(排除已使用 and 处理中的)
func (d *DB) GetPendingOwners() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners WHERE status = 'valid'
ORDER BY created_at ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// GetOwnersForBanCheck 获取需要检查封禁状态的母号
func (d *DB) GetOwnersForBanCheck(checkIntervalHours int) ([]TeamOwner, error) {
// 获取 valid 状态且 超过检查间隔 或 从未检查过 的母号
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners
WHERE status = 'valid'
AND (last_checked_at IS NULL OR last_checked_at < datetime('now', ?))
ORDER BY last_checked_at ASC NULLS FIRST, created_at ASC
`, fmt.Sprintf("-%d hours", checkIntervalHours))
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// UpdateOwnerLastCheckedAt 更新母号的最后检查时间
func (d *DB) UpdateOwnerLastCheckedAt(id int64) error {
_, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE id = ?", id)
return err
}
// UpdateOwnerLastCheckedAtByEmail 通过邮箱更新母号的最后检查时间
func (d *DB) UpdateOwnerLastCheckedAtByEmail(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE email = ?", email)
return err
}
// MarkOwnerAsUsed 标记 Owner 为已使用
func (d *DB) MarkOwnerAsUsed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email)
return err
}
// MarkOwnerAsProcessing 标记 Owner 为处理中
func (d *DB) MarkOwnerAsProcessing(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email)
return err
}
// MarkOwnerAsFailed 标记 Owner 为失败(可重试)
func (d *DB) MarkOwnerAsFailed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email)
return err
}
// MarkOwnerAsInvalid 标记 Owner 为无效Team 被封禁,永久跳过)
func (d *DB) MarkOwnerAsInvalid(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'invalid' WHERE email = ?", email)
return err
}
// UpdateOwnerStatus 更新状态
func (d *DB) UpdateOwnerStatus(id int64, status string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id)
return err
}
// DeleteTeamOwner 删除
func (d *DB) DeleteTeamOwner(id int64) error {
_, err := d.db.Exec("DELETE FROM team_owners WHERE id = ?", id)
return err
}
// DeleteTeamOwnerByEmail 按邮箱删除母号
func (d *DB) DeleteTeamOwnerByEmail(email string) (int64, error) {
result, err := d.db.Exec("DELETE FROM team_owners WHERE email = ?", email)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// ClearTeamOwners 清空
func (d *DB) ClearTeamOwners() error {
_, err := d.db.Exec("DELETE FROM team_owners")
return err
}
// ClearUsedOwners 清理已使用的母号status = 'used' 或 'pooled'
func (d *DB) ClearUsedOwners() (int64, error) {
result, err := d.db.Exec("DELETE FROM team_owners WHERE status IN ('used', 'pooled')")
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// GetOwnersWithoutAccountID 获取缺少 account_id 的 owners
func (d *DB) GetOwnersWithoutAccountID() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners WHERE account_id = '' OR account_id IS NULL
ORDER BY created_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// UpdateOwnerAccountID 更新 owner 的 account_id
func (d *DB) UpdateOwnerAccountID(id int64, accountID string) error {
_, err := d.db.Exec("UPDATE team_owners SET account_id = ? WHERE id = ?", accountID, id)
return err
}
// GetTeamOwnerIDs 获取所有符合条件的 owner ID用于全选
func (d *DB) GetTeamOwnerIDs(status string) ([]int64, error) {
query := "SELECT id FROM team_owners WHERE 1=1"
args := []interface{}{}
if status != "" {
query += " AND status = ?"
args = append(args, status)
}
query += " ORDER BY created_at DESC"
rows, err := d.db.Query(query, args...)
if err != nil {
return nil, err
}
defer rows.Close()
var ids []int64
for rows.Next() {
var id int64
if err := rows.Scan(&id); err != nil {
continue
}
ids = append(ids, id)
}
return ids, nil
}
// GetOwnerStats 获取统计
func (d *DB) GetOwnerStats() map[string]int {
stats := map[string]int{
"total": 0,
"valid": 0,
"processing": 0,
"used": 0,
"invalid": 0,
}
var count int
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners").Scan(&count); err == nil {
stats["total"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'valid'").Scan(&count); err == nil {
stats["valid"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'processing'").Scan(&count); err == nil {
stats["processing"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'used'").Scan(&count); err == nil {
stats["used"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'invalid'").Scan(&count); err == nil {
stats["invalid"] = count
}
return stats
}
// BatchRun 批次运行记录
type BatchRun struct {
ID int64 `json:"id"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
TotalOwners int `json:"total_owners"`
TotalRegistered int `json:"total_registered"`
TotalAddedToS2A int `json:"total_added_to_s2a"`
SuccessRate float64 `json:"success_rate"`
DurationSeconds int `json:"duration_seconds"`
Status string `json:"status"`
Errors string `json:"errors,omitempty"`
}
// CreateBatchRun 创建批次记录
func (d *DB) CreateBatchRun(totalOwners int) (int64, error) {
result, err := d.db.Exec(
`INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`,
time.Now(), totalOwners,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// UpdateBatchRun 更新批次记录
func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error {
finishedAt := time.Now()
// 获取开始时间计算耗时
var startedAt time.Time
var totalOwners int
d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners)
duration := int(finishedAt.Sub(startedAt).Seconds())
// 计算成功率(以入库数为准)
var successRate float64
expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员
if expectedTotal > 0 {
successRate = float64(addedToS2A) / float64(expectedTotal) * 100
}
_, err := d.db.Exec(
`UPDATE batch_runs SET
finished_at = ?,
total_registered = ?,
total_added_to_s2a = ?,
success_rate = ?,
duration_seconds = ?,
status = 'completed',
errors = ?
WHERE id = ?`,
finishedAt, registered, addedToS2A, successRate, duration, errors, id,
)
return err
}
// GetBatchRuns 获取批次记录列表
func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) {
if limit <= 0 {
limit = 50
}
rows, err := d.db.Query(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs
ORDER BY started_at DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []BatchRun
for rows.Next() {
var run BatchRun
var finishedAt sql.NullTime
err := rows.Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
continue
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
runs = append(runs, run)
}
return runs, nil
}
// CleanupStuckBatchRuns 清理卡住的 running 状态批次记录,从子结果重新计算汇总
func (d *DB) CleanupStuckBatchRuns() (int64, error) {
// 先从 batch_team_results 重新汇总数据,再标记为 completed
result, err := d.db.Exec(`
UPDATE batch_runs
SET status = 'completed',
finished_at = COALESCE(finished_at, started_at),
total_registered = COALESCE((
SELECT SUM(registered) FROM batch_team_results WHERE batch_id = batch_runs.id
), 0),
total_added_to_s2a = COALESCE((
SELECT SUM(added_to_s2a) FROM batch_team_results WHERE batch_id = batch_runs.id
), 0),
success_rate = CASE
WHEN total_owners > 0 THEN
COALESCE((SELECT SUM(added_to_s2a) FROM batch_team_results WHERE batch_id = batch_runs.id), 0) * 100.0 / (total_owners * 4)
ELSE 0
END,
duration_seconds = CASE
WHEN finished_at IS NOT NULL THEN CAST((julianday(finished_at) - julianday(started_at)) * 86400 AS INTEGER)
ELSE 0
END
WHERE status = 'running'
`)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// CleanupStuckProcessingOwners 清理卡住的 processing 状态母号,重置为 valid
func (d *DB) CleanupStuckProcessingOwners() (int64, error) {
result, err := d.db.Exec(`UPDATE team_owners SET status = 'valid' WHERE status = 'processing'`)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// GetBatchRunStats 获取批次统计
func (d *DB) GetBatchRunStats() map[string]interface{} {
stats := make(map[string]interface{})
var totalAdded, todayAdded int
var avgRate float64
// 总入库数
d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded)
stats["total_added"] = totalAdded
// 今日入库数
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE DATE(started_at) = DATE('now', 'localtime')
`).Scan(&todayAdded)
stats["today_added"] = todayAdded
// 平均成功率
d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate)
stats["avg_success_rate"] = avgRate
// 本周入库数
var weekAdded int
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE started_at >= datetime('now', '-7 days')
`).Scan(&weekAdded)
stats["week_added"] = weekAdded
return stats
}
// BatchTeamResult 批次Team处理结果
type BatchTeamResult struct {
ID int64 `json:"id"`
BatchID int64 `json:"batch_id"`
TeamIndex int `json:"team_index"`
OwnerEmail string `json:"owner_email"`
TeamID string `json:"team_id"`
Registered int `json:"registered"`
AddedToS2A int `json:"added_to_s2a"`
MemberEmails []string `json:"member_emails"`
Errors []string `json:"errors"`
DurationMs int64 `json:"duration_ms"`
CreatedAt time.Time `json:"created_at"`
}
// SaveBatchTeamResult 保存单个Team处理结果
func (d *DB) SaveBatchTeamResult(batchID int64, result BatchTeamResult) error {
memberEmailsJSON, _ := json.Marshal(result.MemberEmails)
errorsJSON, _ := json.Marshal(result.Errors)
_, err := d.db.Exec(`
INSERT INTO batch_team_results (batch_id, team_index, owner_email, team_id, registered, added_to_s2a, member_emails, errors, duration_ms)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
`, batchID, result.TeamIndex, result.OwnerEmail, result.TeamID, result.Registered, result.AddedToS2A, string(memberEmailsJSON), string(errorsJSON), result.DurationMs)
return err
}
// GetBatchTeamResults 获取某批次的所有Team结果
func (d *DB) GetBatchTeamResults(batchID int64) ([]BatchTeamResult, error) {
rows, err := d.db.Query(`
SELECT id, batch_id, team_index, owner_email, COALESCE(team_id, ''), registered, added_to_s2a,
COALESCE(member_emails, '[]'), COALESCE(errors, '[]'), duration_ms, created_at
FROM batch_team_results
WHERE batch_id = ?
ORDER BY team_index ASC
`, batchID)
if err != nil {
return nil, err
}
defer rows.Close()
var results []BatchTeamResult
for rows.Next() {
var r BatchTeamResult
var memberEmailsJSON, errorsJSON string
err := rows.Scan(&r.ID, &r.BatchID, &r.TeamIndex, &r.OwnerEmail, &r.TeamID, &r.Registered,
&r.AddedToS2A, &memberEmailsJSON, &errorsJSON, &r.DurationMs, &r.CreatedAt)
if err != nil {
continue
}
json.Unmarshal([]byte(memberEmailsJSON), &r.MemberEmails)
json.Unmarshal([]byte(errorsJSON), &r.Errors)
if r.MemberEmails == nil {
r.MemberEmails = []string{}
}
if r.Errors == nil {
r.Errors = []string{}
}
results = append(results, r)
}
return results, nil
}
// GetBatchRunsWithPagination 分页获取批次记录
func (d *DB) GetBatchRunsWithPagination(page, pageSize int) ([]BatchRun, int, error) {
if page < 1 {
page = 1
}
if pageSize <= 0 {
pageSize = 5
}
// 获取总数
var total int
err := d.db.QueryRow("SELECT COUNT(*) FROM batch_runs").Scan(&total)
if err != nil {
return nil, 0, err
}
offset := (page - 1) * pageSize
rows, err := d.db.Query(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs
ORDER BY started_at DESC
LIMIT ? OFFSET ?
`, pageSize, offset)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var runs []BatchRun
for rows.Next() {
var run BatchRun
var finishedAt sql.NullTime
err := rows.Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
continue
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
runs = append(runs, run)
}
return runs, total, nil
}
// GetBatchRunWithResults 获取批次详情包含Team结果
func (d *DB) GetBatchRunWithResults(batchID int64) (*BatchRun, []BatchTeamResult, error) {
// 获取批次信息
var run BatchRun
var finishedAt sql.NullTime
err := d.db.QueryRow(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs WHERE id = ?
`, batchID).Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
return nil, nil, err
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
// 获取Team结果
results, err := d.GetBatchTeamResults(batchID)
if err != nil {
return &run, nil, err
}
return &run, results, nil
}
// CodexProxy CodexAuth 代理配置
type CodexProxy struct {
ID int64 `json:"id"`
ProxyURL string `json:"proxy_url"`
Description string `json:"description"`
IsEnabled bool `json:"is_enabled"`
LastUsedAt *time.Time `json:"last_used_at,omitempty"`
SuccessCount int `json:"success_count"`
FailCount int `json:"fail_count"`
Location string `json:"location"`
LastTestAt *time.Time `json:"last_test_at,omitempty"`
CreatedAt time.Time `json:"created_at"`
}
// AddCodexProxy 添加代理
func (d *DB) AddCodexProxy(proxyURL, description string) (int64, error) {
result, err := d.db.Exec(`
INSERT INTO codex_auth_proxies (proxy_url, description, is_enabled, created_at)
VALUES (?, ?, 1, CURRENT_TIMESTAMP)
`, proxyURL, description)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// AddCodexProxies 批量添加代理
func (d *DB) AddCodexProxies(proxies []string) (int, error) {
tx, err := d.db.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT OR IGNORE INTO codex_auth_proxies (proxy_url, is_enabled, created_at)
VALUES (?, 1, CURRENT_TIMESTAMP)
`)
if err != nil {
return 0, err
}
defer stmt.Close()
count := 0
for _, proxy := range proxies {
result, err := stmt.Exec(proxy)
if err != nil {
continue
}
affected, _ := result.RowsAffected()
if affected > 0 {
count++
}
}
if err := tx.Commit(); err != nil {
return 0, err
}
return count, nil
}
// GetCodexProxies 获取代理列表
func (d *DB) GetCodexProxies() ([]CodexProxy, error) {
rows, err := d.db.Query(`
SELECT id, proxy_url, COALESCE(description, ''), is_enabled, last_used_at, success_count, fail_count, COALESCE(location, ''), last_test_at, created_at
FROM codex_auth_proxies
ORDER BY created_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var proxies []CodexProxy
for rows.Next() {
var p CodexProxy
var lastUsedAt, lastTestAt sql.NullTime
err := rows.Scan(&p.ID, &p.ProxyURL, &p.Description, &p.IsEnabled, &lastUsedAt, &p.SuccessCount, &p.FailCount, &p.Location, &lastTestAt, &p.CreatedAt)
if err != nil {
continue
}
if lastUsedAt.Valid {
p.LastUsedAt = &lastUsedAt.Time
}
if lastTestAt.Valid {
p.LastTestAt = &lastTestAt.Time
}
proxies = append(proxies, p)
}
return proxies, nil
}
// GetEnabledCodexProxies 获取已启用的代理列表
func (d *DB) GetEnabledCodexProxies() ([]CodexProxy, error) {
rows, err := d.db.Query(`
SELECT id, proxy_url, COALESCE(description, ''), is_enabled, last_used_at, success_count, fail_count, COALESCE(location, ''), last_test_at, created_at
FROM codex_auth_proxies
WHERE is_enabled = 1
ORDER BY success_count DESC, fail_count ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var proxies []CodexProxy
for rows.Next() {
var p CodexProxy
var lastUsedAt, lastTestAt sql.NullTime
err := rows.Scan(&p.ID, &p.ProxyURL, &p.Description, &p.IsEnabled, &lastUsedAt, &p.SuccessCount, &p.FailCount, &p.Location, &lastTestAt, &p.CreatedAt)
if err != nil {
continue
}
if lastUsedAt.Valid {
p.LastUsedAt = &lastUsedAt.Time
}
if lastTestAt.Valid {
p.LastTestAt = &lastTestAt.Time
}
proxies = append(proxies, p)
}
return proxies, nil
}
// GetRandomCodexProxy 随机获取一个已启用的代理
func (d *DB) GetRandomCodexProxy() (string, error) {
var proxyURL string
err := d.db.QueryRow(`
SELECT proxy_url FROM codex_auth_proxies
WHERE is_enabled = 1
ORDER BY RANDOM()
LIMIT 1
`).Scan(&proxyURL)
if err == sql.ErrNoRows {
return "", nil
}
if err != nil {
return "", err
}
// 更新最后使用时间
d.db.Exec("UPDATE codex_auth_proxies SET last_used_at = CURRENT_TIMESTAMP WHERE proxy_url = ?", proxyURL)
return proxyURL, nil
}
// UpdateCodexProxyStats 更新代理统计
func (d *DB) UpdateCodexProxyStats(proxyURL string, success bool) error {
if success {
_, err := d.db.Exec("UPDATE codex_auth_proxies SET success_count = success_count + 1, last_used_at = CURRENT_TIMESTAMP WHERE proxy_url = ?", proxyURL)
return err
}
_, err := d.db.Exec("UPDATE codex_auth_proxies SET fail_count = fail_count + 1, last_used_at = CURRENT_TIMESTAMP WHERE proxy_url = ?", proxyURL)
return err
}
// UpdateCodexProxyTestResult 更新代理测试结果
func (d *DB) UpdateCodexProxyTestResult(id int64, location string, success bool) error {
if success {
_, err := d.db.Exec(`
UPDATE codex_auth_proxies
SET location = ?, last_test_at = CURRENT_TIMESTAMP, success_count = success_count + 1
WHERE id = ?
`, location, id)
return err
}
_, err := d.db.Exec(`
UPDATE codex_auth_proxies
SET last_test_at = CURRENT_TIMESTAMP, fail_count = fail_count + 1
WHERE id = ?
`, id)
return err
}
// ToggleCodexProxy 切换代理启用状态
func (d *DB) ToggleCodexProxy(id int64) error {
_, err := d.db.Exec("UPDATE codex_auth_proxies SET is_enabled = 1 - is_enabled WHERE id = ?", id)
return err
}
// DeleteCodexProxy 删除代理
func (d *DB) DeleteCodexProxy(id int64) error {
_, err := d.db.Exec("DELETE FROM codex_auth_proxies WHERE id = ?", id)
return err
}
// ClearCodexProxies 清空所有代理
func (d *DB) ClearCodexProxies() error {
_, err := d.db.Exec("DELETE FROM codex_auth_proxies")
return err
}
// GetCodexProxyByID 获取指定 ID 的代理地址
func (d *DB) GetCodexProxyByID(id int64) (string, error) {
var proxyURL string
err := d.db.QueryRow("SELECT proxy_url FROM codex_auth_proxies WHERE id = ?", id).Scan(&proxyURL)
if err == sql.ErrNoRows {
return "", nil
}
return proxyURL, err
}
// ResolveProxy 解析代理字符串(支持 pool:random, pool:id:N, 或直接 URL
func (d *DB) ResolveProxy(proxyStr string) string {
if proxyStr == "" {
return ""
}
// 兼容旧的 [RANDOM] 格式
if proxyStr == "pool:random" || proxyStr == "[RANDOM]" {
p, _ := d.GetRandomCodexProxy()
return p
}
// 处理 pool:id:N 格式
if strings.HasPrefix(proxyStr, "pool:id:") {
idStr := strings.TrimPrefix(proxyStr, "pool:id:")
id, err := strconv.ParseInt(idStr, 10, 64)
if err == nil {
p, _ := d.GetCodexProxyByID(id)
return p
}
}
// 否则视为字面 URL如果没协议头加一个保持现有行为
if !strings.HasPrefix(proxyStr, "http://") && !strings.HasPrefix(proxyStr, "https://") && !strings.HasPrefix(proxyStr, "socks5://") {
return "http://" + proxyStr
}
return proxyStr
}
// GetCodexProxyStats 获取代理统计
func (d *DB) GetCodexProxyStats() map[string]int {
stats := map[string]int{
"total": 0,
"enabled": 0,
"disabled": 0,
}
var total, enabled, disabled int
d.db.QueryRow("SELECT COUNT(*) FROM codex_auth_proxies").Scan(&total)
d.db.QueryRow("SELECT COUNT(*) FROM codex_auth_proxies WHERE is_enabled = 1").Scan(&enabled)
d.db.QueryRow("SELECT COUNT(*) FROM codex_auth_proxies WHERE is_enabled = 0").Scan(&disabled)
stats["total"] = total
stats["enabled"] = enabled
stats["disabled"] = disabled
return stats
}
// ========================
// 日志持久化相关方法
// ========================
// LogEntry 日志条目
type LogEntry struct {
ID int64 `json:"id"`
Timestamp time.Time `json:"timestamp"`
Level string `json:"level"`
Message string `json:"message"`
Email string `json:"email,omitempty"`
Module string `json:"module,omitempty"`
}
// InsertLog 插入日志
func (d *DB) InsertLog(timestamp time.Time, level, message, email, module string) error {
_, err := d.db.Exec(`
INSERT INTO app_logs (timestamp, level, message, email, module)
VALUES (?, ?, ?, ?, ?)
`, timestamp, level, message, email, module)
return err
}
// GetLogs 获取最近的日志
func (d *DB) GetLogs(limit int) ([]LogEntry, error) {
if limit <= 0 {
limit = 100
}
if limit > 1000 {
limit = 1000
}
rows, err := d.db.Query(`
SELECT id, timestamp, level, message, COALESCE(email, ''), module
FROM app_logs
ORDER BY timestamp DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var logs []LogEntry
for rows.Next() {
var log LogEntry
if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil {
continue
}
logs = append(logs, log)
}
// 反转顺序(让最新的在后面,符合日志显示习惯)
for i, j := 0, len(logs)-1; i < j; i, j = i+1, j-1 {
logs[i], logs[j] = logs[j], logs[i]
}
return logs, nil
}
// GetLogsByModule 按模块获取日志
func (d *DB) GetLogsByModule(module string, page, pageSize int) ([]LogEntry, int, error) {
if page < 1 {
page = 1
}
if pageSize <= 0 {
pageSize = 20
}
// 获取总数
var total int
d.db.QueryRow("SELECT COUNT(*) FROM app_logs WHERE module = ?", module).Scan(&total)
offset := (page - 1) * pageSize
rows, err := d.db.Query(`
SELECT id, timestamp, level, message, COALESCE(email, ''), module
FROM app_logs
WHERE module = ?
ORDER BY timestamp DESC
LIMIT ? OFFSET ?
`, module, pageSize, offset)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var logs []LogEntry
for rows.Next() {
var log LogEntry
if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil {
continue
}
logs = append(logs, log)
}
return logs, total, nil
}
// GetLogsByModuleAndLevel 按模块和级别获取日志
func (d *DB) GetLogsByModuleAndLevel(module, level string, page, pageSize int) ([]LogEntry, int, error) {
if page < 1 {
page = 1
}
if pageSize <= 0 {
pageSize = 20
}
// 构建查询
countQuery := "SELECT COUNT(*) FROM app_logs WHERE module = ?"
selectQuery := `
SELECT id, timestamp, level, message, COALESCE(email, ''), module
FROM app_logs
WHERE module = ?`
args := []interface{}{module}
if level != "" {
countQuery += " AND level = ?"
selectQuery += " AND level = ?"
args = append(args, level)
}
// 获取总数
var total int
d.db.QueryRow(countQuery, args...).Scan(&total)
offset := (page - 1) * pageSize
selectQuery += " ORDER BY timestamp DESC LIMIT ? OFFSET ?"
args = append(args, pageSize, offset)
rows, err := d.db.Query(selectQuery, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var logs []LogEntry
for rows.Next() {
var log LogEntry
if err := rows.Scan(&log.ID, &log.Timestamp, &log.Level, &log.Message, &log.Email, &log.Module); err != nil {
continue
}
logs = append(logs, log)
}
return logs, total, nil
}
// ClearLogs 清空所有日志
func (d *DB) ClearLogs() error {
_, err := d.db.Exec("DELETE FROM app_logs")
return err
}
// ClearLogsByModule 按模块清空日志
func (d *DB) ClearLogsByModule(module string) (int64, error) {
result, err := d.db.Exec("DELETE FROM app_logs WHERE module = ?", module)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// CleanupOldLogs 清理旧日志(保留最近 N 条)
func (d *DB) CleanupOldLogs(keepCount int) (int64, error) {
if keepCount <= 0 {
keepCount = 5000
}
result, err := d.db.Exec(`
DELETE FROM app_logs
WHERE id NOT IN (
SELECT id FROM app_logs ORDER BY timestamp DESC LIMIT ?
)
`, keepCount)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// Close 关闭数据库
func (d *DB) Close() error {
if d.db != nil {
return d.db.Close()
}
return nil
}