Files
codexautopool/backend/internal/database/sqlite.go

466 lines
12 KiB
Go

package database
import (
"database/sql"
"fmt"
"time"
_ "github.com/mattn/go-sqlite3"
)
// TeamOwner 账号结构
type TeamOwner struct {
ID int64 `json:"id"`
Email string `json:"email"`
Password string `json:"password,omitempty"`
Token string `json:"token,omitempty"`
AccountID string `json:"account_id"`
Status string `json:"status"`
CreatedAt time.Time `json:"created_at"`
}
// DB 数据库管理器
type DB struct {
db *sql.DB
}
// 全局数据库实例
var Instance *DB
// Init 初始化数据库
func Init(dbPath string) error {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return fmt.Errorf("打开数据库失败: %w", err)
}
Instance = &DB{db: db}
if err := Instance.createTables(); err != nil {
return fmt.Errorf("创建表失败: %w", err)
}
fmt.Printf("[数据库] SQLite 已连接: %s\n", dbPath)
return nil
}
// createTables 创建表
func (d *DB) createTables() error {
_, err := d.db.Exec(`
CREATE TABLE IF NOT EXISTS team_owners (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT NOT NULL UNIQUE,
password TEXT,
token TEXT,
account_id TEXT NOT NULL,
status TEXT DEFAULT 'valid',
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_team_owners_email ON team_owners(email);
CREATE INDEX IF NOT EXISTS idx_team_owners_status ON team_owners(status);
CREATE INDEX IF NOT EXISTS idx_team_owners_account_id ON team_owners(account_id);
-- 配置表 (key-value 形式)
CREATE TABLE IF NOT EXISTS app_config (
key TEXT PRIMARY KEY,
value TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- 批次运行记录表
CREATE TABLE IF NOT EXISTS batch_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at DATETIME NOT NULL,
finished_at DATETIME,
total_owners INTEGER DEFAULT 0,
total_registered INTEGER DEFAULT 0,
total_added_to_s2a INTEGER DEFAULT 0,
success_rate REAL DEFAULT 0,
duration_seconds INTEGER DEFAULT 0,
status TEXT DEFAULT 'running',
errors TEXT
);
CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at);
`)
return err
}
// GetConfig 获取配置值
func (d *DB) GetConfig(key string) (string, error) {
var value string
err := d.db.QueryRow("SELECT value FROM app_config WHERE key = ?", key).Scan(&value)
if err == sql.ErrNoRows {
return "", nil
}
return value, err
}
// SetConfig 设置配置值
func (d *DB) SetConfig(key, value string) error {
_, err := d.db.Exec(`
INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP)
ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP
`, key, value, value)
return err
}
// GetAllConfig 获取所有配置
func (d *DB) GetAllConfig() (map[string]string, error) {
rows, err := d.db.Query("SELECT key, value FROM app_config")
if err != nil {
return nil, err
}
defer rows.Close()
result := make(map[string]string)
for rows.Next() {
var key, value string
if err := rows.Scan(&key, &value); err != nil {
continue
}
result[key] = value
}
return result, nil
}
// AddTeamOwner 添加 Team Owner
func (d *DB) AddTeamOwner(owner TeamOwner) (int64, error) {
result, err := d.db.Exec(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`, owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// AddTeamOwners 批量添加
func (d *DB) AddTeamOwners(owners []TeamOwner) (int, error) {
tx, err := d.db.Begin()
if err != nil {
return 0, err
}
defer tx.Rollback()
stmt, err := tx.Prepare(`
INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at)
VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP)
`)
if err != nil {
return 0, err
}
defer stmt.Close()
count := 0
for _, owner := range owners {
_, err := stmt.Exec(owner.Email, owner.Password, owner.Token, owner.AccountID)
if err != nil {
fmt.Printf("[数据库] 插入失败 %s: %v\n", owner.Email, err)
continue
}
count++
}
if err := tx.Commit(); err != nil {
return 0, err
}
return count, nil
}
// GetTeamOwners 获取列表
func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int, error) {
query := "SELECT id, email, password, token, account_id, status, created_at FROM team_owners WHERE 1=1"
countQuery := "SELECT COUNT(*) FROM team_owners WHERE 1=1"
args := []interface{}{}
if status != "" {
query += " AND status = ?"
countQuery += " AND status = ?"
args = append(args, status)
}
var total int
err := d.db.QueryRow(countQuery, args...).Scan(&total)
if err != nil {
return nil, 0, err
}
query += " ORDER BY created_at DESC LIMIT ? OFFSET ?"
args = append(args, limit, offset)
rows, err := d.db.Query(query, args...)
if err != nil {
return nil, 0, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt)
if err != nil {
continue
}
owners = append(owners, owner)
}
return owners, total, nil
}
// GetPendingOwners 获取待处理(排除已使用和处理中的)
func (d *DB) GetPendingOwners() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at
FROM team_owners WHERE status = 'valid'
ORDER BY created_at ASC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt)
if err != nil {
continue
}
owners = append(owners, owner)
}
return owners, nil
}
// MarkOwnerAsUsed 标记 Owner 为已使用
func (d *DB) MarkOwnerAsUsed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email)
return err
}
// MarkOwnerAsProcessing 标记 Owner 为处理中
func (d *DB) MarkOwnerAsProcessing(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email)
return err
}
// MarkOwnerAsFailed 标记 Owner 为失败(可重试)
func (d *DB) MarkOwnerAsFailed(email string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email)
return err
}
// UpdateOwnerStatus 更新状态
func (d *DB) UpdateOwnerStatus(id int64, status string) error {
_, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id)
return err
}
// DeleteTeamOwner 删除
func (d *DB) DeleteTeamOwner(id int64) error {
_, err := d.db.Exec("DELETE FROM team_owners WHERE id = ?", id)
return err
}
// ClearTeamOwners 清空
func (d *DB) ClearTeamOwners() error {
_, err := d.db.Exec("DELETE FROM team_owners")
return err
}
// GetOwnersWithoutAccountID 获取缺少 account_id 的 owners
func (d *DB) GetOwnersWithoutAccountID() ([]TeamOwner, error) {
rows, err := d.db.Query(`
SELECT id, email, password, token, account_id, status, created_at
FROM team_owners WHERE account_id = '' OR account_id IS NULL
ORDER BY created_at DESC
`)
if err != nil {
return nil, err
}
defer rows.Close()
var owners []TeamOwner
for rows.Next() {
var owner TeamOwner
err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt)
if err != nil {
continue
}
owners = append(owners, owner)
}
return owners, nil
}
// UpdateOwnerAccountID 更新 owner 的 account_id
func (d *DB) UpdateOwnerAccountID(id int64, accountID string) error {
_, err := d.db.Exec("UPDATE team_owners SET account_id = ? WHERE id = ?", accountID, id)
return err
}
// GetOwnerStats 获取统计
func (d *DB) GetOwnerStats() map[string]int {
stats := map[string]int{
"total": 0,
"valid": 0,
"registered": 0,
"pooled": 0,
}
var count int
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners").Scan(&count); err == nil {
stats["total"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'valid'").Scan(&count); err == nil {
stats["valid"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'registered'").Scan(&count); err == nil {
stats["registered"] = count
}
if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'pooled'").Scan(&count); err == nil {
stats["pooled"] = count
}
return stats
}
// BatchRun 批次运行记录
type BatchRun struct {
ID int64 `json:"id"`
StartedAt time.Time `json:"started_at"`
FinishedAt time.Time `json:"finished_at,omitempty"`
TotalOwners int `json:"total_owners"`
TotalRegistered int `json:"total_registered"`
TotalAddedToS2A int `json:"total_added_to_s2a"`
SuccessRate float64 `json:"success_rate"`
DurationSeconds int `json:"duration_seconds"`
Status string `json:"status"`
Errors string `json:"errors,omitempty"`
}
// CreateBatchRun 创建批次记录
func (d *DB) CreateBatchRun(totalOwners int) (int64, error) {
result, err := d.db.Exec(
`INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`,
time.Now(), totalOwners,
)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// UpdateBatchRun 更新批次记录
func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error {
finishedAt := time.Now()
// 获取开始时间计算耗时
var startedAt time.Time
var totalOwners int
d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners)
duration := int(finishedAt.Sub(startedAt).Seconds())
// 计算成功率(以入库数为准)
var successRate float64
expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员
if expectedTotal > 0 {
successRate = float64(addedToS2A) / float64(expectedTotal) * 100
}
_, err := d.db.Exec(
`UPDATE batch_runs SET
finished_at = ?,
total_registered = ?,
total_added_to_s2a = ?,
success_rate = ?,
duration_seconds = ?,
status = 'completed',
errors = ?
WHERE id = ?`,
finishedAt, registered, addedToS2A, successRate, duration, errors, id,
)
return err
}
// GetBatchRuns 获取批次记录列表
func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) {
if limit <= 0 {
limit = 50
}
rows, err := d.db.Query(`
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
success_rate, duration_seconds, status, COALESCE(errors, '')
FROM batch_runs
ORDER BY started_at DESC
LIMIT ?
`, limit)
if err != nil {
return nil, err
}
defer rows.Close()
var runs []BatchRun
for rows.Next() {
var run BatchRun
var finishedAt sql.NullTime
err := rows.Scan(
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
)
if err != nil {
continue
}
if finishedAt.Valid {
run.FinishedAt = finishedAt.Time
}
runs = append(runs, run)
}
return runs, nil
}
// GetBatchRunStats 获取批次统计
func (d *DB) GetBatchRunStats() map[string]interface{} {
stats := make(map[string]interface{})
var totalAdded, todayAdded int
var avgRate float64
// 总入库数
d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded)
stats["total_added"] = totalAdded
// 今日入库数
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE DATE(started_at) = DATE('now', 'localtime')
`).Scan(&todayAdded)
stats["today_added"] = todayAdded
// 平均成功率
d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate)
stats["avg_success_rate"] = avgRate
// 本周入库数
var weekAdded int
d.db.QueryRow(`
SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs
WHERE started_at >= datetime('now', '-7 days')
`).Scan(&weekAdded)
stats["week_added"] = weekAdded
return stats
}
// Close 关闭数据库
func (d *DB) Close() error {
if d.db != nil {
return d.db.Close()
}
return nil
}