Files
codexautopool/backend/internal/database/sqlite.go

579 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package database
import (
"database/sql"
"fmt"
"time"
_ "github.com/mattn/go-sqlite3"
)
// TeamOwner 账号结构
type TeamOwner struct {
ID int64 `json:"id"`
Email string `json:"email"`
Password string `json:"password,omitempty"`
Token string `json:"token,omitempty"`
AccountID string `json:"account_id"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
LastCheckedAt *time.Time `json:"last_checked_at,omitempty"`
}
// DB 数据库管理器
type DB struct {
db *sql.DB
}
// 全局数据库实例
var Instance *DB
// Init 初始化数据库
func Init(dbPath string) error {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("打开数据库失败: %w", err)
}
Instance = &DB{db: db}
if err := Instance.createTables(); err != nil {
return fmt.Errorf("创建表失败: %w", err)
}
// 执行数据库迁移
if err := Instance.migrate(); err != nil {
fmt.Printf("[数据库] 迁移警告: %v\n", err)
}
fmt.Printf("[数据库] SQLite 已连接: %s\n", dbPath)
return nil
}
// migrate 数据库迁移
func (d *DB) migrate() error {
// 添加 last_checked_at 列(如果不存在)
_, err := d.db.Exec(`ALTER TABLE team_owners ADD COLUMN last_checked_at DATETIME`)
if err != nil && !isColumnExistsError(err) {
return err
}
return nil
}
// isColumnExistsError 判断是否是列已存在的错误
func isColumnExistsError(err error) bool {
return err != nil && (err.Error() == "duplicate column name: last_checked_at" ||
err.Error() == "table team_owners already has a column named last_checked_at")
}
// createTables 创建表
func (d *DB) createTables() error {
_, err := d.db.Exec(`
CREATE TABLE IF NOT EXISTS team_owners (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password TEXT,
token TEXT,
account_id TEXT NOT NULL,
status TEXT DEFAULT 'valid',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
last_checked_at DATETIME
);
CREATE INDEX IF NOT EXISTS idx_team_owners_email ON team_owners(email);
CREATE INDEX IF NOT EXISTS idx_team_owners_status ON team_owners(status);
CREATE INDEX IF NOT EXISTS idx_team_owners_account_id ON team_owners(account_id);
-- 配置表 (key-value 形式)
CREATE TABLE IF NOT EXISTS app_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 批次运行记录表
CREATE TABLE IF NOT EXISTS batch_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
finished_at DATETIME,
total_owners INTEGER DEFAULT 0,
total_registered INTEGER DEFAULT 0,
total_added_to_s2a INTEGER DEFAULT 0,
success_rate REAL DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
errors TEXT
);
CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at);
`)
return err
}
// GetConfig 获取配置值
func (d *DB) GetConfig(key string) (string, error) {
var value string
err := d.db.QueryRow("SELECT value FROM app_config WHERE key = ?", key).Scan(&value)
if err == sql.ErrNoRows {
return "", nil
}
return value, err
}
// SetConfig 设置配置值
func (d *DB) SetConfig(key, value string) error {
_, err := d.db.Exec(`
INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP
`, key, value, value)
return err
}
// GetAllConfig 获取所有配置
func (d *DB) GetAllConfig() (map[string]string, error) {
rows, err := d.db.Query("SELECT key, value FROM app_config")
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]string)
for rows.Next() {
var key, value string
if err := rows.Scan(&key, &value); err != nil {
continue
}
result[key] = value
}
return result, nil
}
// AddTeamOwner 添加 Team Owner
func (d *DB) AddTeamOwner(owner TeamOwner) (int64, error) {
result, err := d.db.Exec(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`, owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// AddTeamOwners 批量添加
func (d *DB) AddTeamOwners(owners []TeamOwner) (int, error) {
tx, err := d.db.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`)
if err != nil {
return 0, err
}
defer stmt.Close()
count := 0
for _, owner := range owners {
_, err := stmt.Exec(owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
fmt.Printf("[数据库] 插入失败 %s: %v\n", owner.Email, err)
continue
}
count++
}
if err := tx.Commit(); err != nil {
return 0, err
}
return count, nil
}
// GetTeamOwners 获取列表
func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int, error) {
query := "SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE 1=1"
countQuery := "SELECT COUNT(*) FROM team_owners WHERE 1=1"
args := []interface{}{}
if status != "" {
query += " AND status = ?"
countQuery += " AND status = ?"
args = append(args, status)
}
var total int
err := d.db.QueryRow(countQuery, args...).Scan(&total)
if err != nil {
return nil, 0, err
}
// 排序:已使用(used, pooled)排前面,其次按创建时间降序
query += " ORDER BY CASE WHEN status IN ('used', 'pooled') THEN 0 ELSE 1 END, created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := d.db.Query(query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, total, nil
}
// GetPendingOwners 获取待处理(排除已使用和处理中的)
func (d *DB) GetPendingOwners() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners WHERE status = 'valid'
ORDER BY created_at ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// GetOwnersForBanCheck 获取需要检查封禁状态的母号
func (d *DB) GetOwnersForBanCheck(checkIntervalHours int) ([]TeamOwner, error) {
// 获取 valid 状态且 超过检查间隔 或 从未检查过 的母号
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners
WHERE status = 'valid'
AND (last_checked_at IS NULL OR last_checked_at < datetime('now', ?))
ORDER BY last_checked_at ASC NULLS FIRST, created_at ASC
`, fmt.Sprintf("-%d hours", checkIntervalHours))
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// UpdateOwnerLastCheckedAt 更新母号的最后检查时间
func (d *DB) UpdateOwnerLastCheckedAt(id int64) error {
_, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE id = ?", id)
return err
}
// UpdateOwnerLastCheckedAtByEmail 通过邮箱更新母号的最后检查时间
func (d *DB) UpdateOwnerLastCheckedAtByEmail(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE email = ?", email)
return err
}
// MarkOwnerAsUsed 标记 Owner 为已使用
func (d *DB) MarkOwnerAsUsed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email)
return err
}
// MarkOwnerAsProcessing 标记 Owner 为处理中
func (d *DB) MarkOwnerAsProcessing(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email)
return err
}
// MarkOwnerAsFailed 标记 Owner 为失败(可重试)
func (d *DB) MarkOwnerAsFailed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email)
return err
}
// MarkOwnerAsInvalid 标记 Owner 为无效Team 被封禁,永久跳过)
func (d *DB) MarkOwnerAsInvalid(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'invalid' WHERE email = ?", email)
return err
}
// UpdateOwnerStatus 更新状态
func (d *DB) UpdateOwnerStatus(id int64, status string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id)
return err
}
// DeleteTeamOwner 删除
func (d *DB) DeleteTeamOwner(id int64) error {
_, err := d.db.Exec("DELETE FROM team_owners WHERE id = ?", id)
return err
}
// ClearTeamOwners 清空
func (d *DB) ClearTeamOwners() error {
_, err := d.db.Exec("DELETE FROM team_owners")
return err
}
// ClearUsedOwners 清理已使用的母号status = 'used' 或 'pooled'
func (d *DB) ClearUsedOwners() (int64, error) {
result, err := d.db.Exec("DELETE FROM team_owners WHERE status IN ('used', 'pooled')")
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// GetOwnersWithoutAccountID 获取缺少 account_id 的 owners
func (d *DB) GetOwnersWithoutAccountID() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at, last_checked_at
FROM team_owners WHERE account_id = '' OR account_id IS NULL
ORDER BY created_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
var lastCheckedAt sql.NullTime
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt)
if err != nil {
continue
}
if lastCheckedAt.Valid {
owner.LastCheckedAt = &lastCheckedAt.Time
}
owners = append(owners, owner)
}
return owners, nil
}
// UpdateOwnerAccountID 更新 owner 的 account_id
func (d *DB) UpdateOwnerAccountID(id int64, accountID string) error {
_, err := d.db.Exec("UPDATE team_owners SET account_id = ? WHERE id = ?", accountID, id)
return err
}
// GetOwnerStats 获取统计
func (d *DB) GetOwnerStats() map[string]int {
stats := map[string]int{
"total": 0,
"valid": 0,
"processing": 0,
"used": 0,
"invalid": 0,
}
var count int
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners").Scan(&count); err == nil {
stats["total"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'valid'").Scan(&count); err == nil {
stats["valid"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'processing'").Scan(&count); err == nil {
stats["processing"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'used'").Scan(&count); err == nil {
stats["used"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'invalid'").Scan(&count); err == nil {
stats["invalid"] = count
}
return stats
}
// BatchRun 批次运行记录
type BatchRun struct {
ID int64 `json:"id"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
TotalOwners int `json:"total_owners"`
TotalRegistered int `json:"total_registered"`
TotalAddedToS2A int `json:"total_added_to_s2a"`
SuccessRate float64 `json:"success_rate"`
DurationSeconds int `json:"duration_seconds"`
Status string `json:"status"`
Errors string `json:"errors,omitempty"`
}
// CreateBatchRun 创建批次记录
func (d *DB) CreateBatchRun(totalOwners int) (int64, error) {
result, err := d.db.Exec(
`INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`,
time.Now(), totalOwners,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// UpdateBatchRun 更新批次记录
func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error {
finishedAt := time.Now()
// 获取开始时间计算耗时
var startedAt time.Time
var totalOwners int
d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners)
duration := int(finishedAt.Sub(startedAt).Seconds())
// 计算成功率(以入库数为准)
var successRate float64
expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员
if expectedTotal > 0 {
successRate = float64(addedToS2A) / float64(expectedTotal) * 100
}
_, err := d.db.Exec(
`UPDATE batch_runs SET
finished_at = ?,
total_registered = ?,
total_added_to_s2a = ?,
success_rate = ?,
duration_seconds = ?,
status = 'completed',
errors = ?
WHERE id = ?`,
finishedAt, registered, addedToS2A, successRate, duration, errors, id,
)
return err
}
// GetBatchRuns 获取批次记录列表
func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) {
if limit <= 0 {
limit = 50
}
rows, err := d.db.Query(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs
ORDER BY started_at DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []BatchRun
for rows.Next() {
var run BatchRun
var finishedAt sql.NullTime
err := rows.Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
continue
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
runs = append(runs, run)
}
return runs, nil
}
// CleanupStuckBatchRuns 清理卡住的 running 状态批次记录
func (d *DB) CleanupStuckBatchRuns() (int64, error) {
result, err := d.db.Exec(`
UPDATE batch_runs
SET status = 'completed',
finished_at = COALESCE(finished_at, started_at),
duration_seconds = 0
WHERE status = 'running'
`)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
// GetBatchRunStats 获取批次统计
func (d *DB) GetBatchRunStats() map[string]interface{} {
stats := make(map[string]interface{})
var totalAdded, todayAdded int
var avgRate float64
// 总入库数
d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded)
stats["total_added"] = totalAdded
// 今日入库数
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE DATE(started_at) = DATE('now', 'localtime')
`).Scan(&todayAdded)
stats["today_added"] = todayAdded
// 平均成功率
d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate)
stats["avg_success_rate"] = avgRate
// 本周入库数
var weekAdded int
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE started_at >= datetime('now', '-7 days')
`).Scan(&weekAdded)
stats["week_added"] = weekAdded
return stats
}
// Close 关闭数据库
func (d *DB) Close() error {
if d.db != nil {
return d.db.Close()
}
return nil
}