package database import ( "database/sql" "fmt" "time" _ "github.com/mattn/go-sqlite3" ) // TeamOwner 账号结构 type TeamOwner struct { ID int64 `json:"id"` Email string `json:"email"` Password string `json:"password,omitempty"` Token string `json:"token,omitempty"` AccountID string `json:"account_id"` Status string `json:"status"` CreatedAt time.Time `json:"created_at"` LastCheckedAt *time.Time `json:"last_checked_at,omitempty"` } // DB 数据库管理器 type DB struct { db *sql.DB } // 全局数据库实例 var Instance *DB // Init 初始化数据库 func Init(dbPath string) error { db, err := sql.Open("sqlite3", dbPath) if err != nil { return fmt.Errorf("打开数据库失败: %w", err) } Instance = &DB{db: db} if err := Instance.createTables(); err != nil { return fmt.Errorf("创建表失败: %w", err) } // 执行数据库迁移 if err := Instance.migrate(); err != nil { fmt.Printf("[数据库] 迁移警告: %v\n", err) } fmt.Printf("[数据库] SQLite 已连接: %s\n", dbPath) return nil } // migrate 数据库迁移 func (d *DB) migrate() error { // 添加 last_checked_at 列(如果不存在) _, err := d.db.Exec(`ALTER TABLE team_owners ADD COLUMN last_checked_at DATETIME`) if err != nil && !isColumnExistsError(err) { return err } return nil } // isColumnExistsError 判断是否是列已存在的错误 func isColumnExistsError(err error) bool { return err != nil && (err.Error() == "duplicate column name: last_checked_at" || err.Error() == "table team_owners already has a column named last_checked_at") } // createTables 创建表 func (d *DB) createTables() error { _, err := d.db.Exec(` CREATE TABLE IF NOT EXISTS team_owners ( id INTEGER PRIMARY KEY AUTOINCREMENT, email TEXT NOT NULL UNIQUE, password TEXT, token TEXT, account_id TEXT NOT NULL, status TEXT DEFAULT 'valid', created_at DATETIME DEFAULT CURRENT_TIMESTAMP, last_checked_at DATETIME ); CREATE INDEX IF NOT EXISTS idx_team_owners_email ON team_owners(email); CREATE INDEX IF NOT EXISTS idx_team_owners_status ON team_owners(status); CREATE INDEX IF NOT EXISTS idx_team_owners_account_id ON team_owners(account_id); -- 配置表 (key-value 形式) CREATE TABLE IF NOT EXISTS app_config ( key TEXT PRIMARY KEY, value TEXT NOT NULL, updated_at DATETIME DEFAULT CURRENT_TIMESTAMP ); -- 批次运行记录表 CREATE TABLE IF NOT EXISTS batch_runs ( id INTEGER PRIMARY KEY AUTOINCREMENT, started_at DATETIME NOT NULL, finished_at DATETIME, total_owners INTEGER DEFAULT 0, total_registered INTEGER DEFAULT 0, total_added_to_s2a INTEGER DEFAULT 0, success_rate REAL DEFAULT 0, duration_seconds INTEGER DEFAULT 0, status TEXT DEFAULT 'running', errors TEXT ); CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at); `) return err } // GetConfig 获取配置值 func (d *DB) GetConfig(key string) (string, error) { var value string err := d.db.QueryRow("SELECT value FROM app_config WHERE key = ?", key).Scan(&value) if err == sql.ErrNoRows { return "", nil } return value, err } // SetConfig 设置配置值 func (d *DB) SetConfig(key, value string) error { _, err := d.db.Exec(` INSERT INTO app_config (key, value, updated_at) VALUES (?, ?, CURRENT_TIMESTAMP) ON CONFLICT(key) DO UPDATE SET value = ?, updated_at = CURRENT_TIMESTAMP `, key, value, value) return err } // GetAllConfig 获取所有配置 func (d *DB) GetAllConfig() (map[string]string, error) { rows, err := d.db.Query("SELECT key, value FROM app_config") if err != nil { return nil, err } defer rows.Close() result := make(map[string]string) for rows.Next() { var key, value string if err := rows.Scan(&key, &value); err != nil { continue } result[key] = value } return result, nil } // AddTeamOwner 添加 Team Owner func (d *DB) AddTeamOwner(owner TeamOwner) (int64, error) { result, err := d.db.Exec(` INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at) VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP) `, owner.Email, owner.Password, owner.Token, owner.AccountID) if err != nil { return 0, err } return result.LastInsertId() } // AddTeamOwners 批量添加 func (d *DB) AddTeamOwners(owners []TeamOwner) (int, error) { tx, err := d.db.Begin() if err != nil { return 0, err } defer tx.Rollback() stmt, err := tx.Prepare(` INSERT OR REPLACE INTO team_owners (email, password, token, account_id, status, created_at) VALUES (?, ?, ?, ?, 'valid', CURRENT_TIMESTAMP) `) if err != nil { return 0, err } defer stmt.Close() count := 0 for _, owner := range owners { _, err := stmt.Exec(owner.Email, owner.Password, owner.Token, owner.AccountID) if err != nil { fmt.Printf("[数据库] 插入失败 %s: %v\n", owner.Email, err) continue } count++ } if err := tx.Commit(); err != nil { return 0, err } return count, nil } // GetTeamOwners 获取列表 func (d *DB) GetTeamOwners(status string, limit, offset int) ([]TeamOwner, int, error) { query := "SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE 1=1" countQuery := "SELECT COUNT(*) FROM team_owners WHERE 1=1" args := []interface{}{} if status != "" { query += " AND status = ?" countQuery += " AND status = ?" args = append(args, status) } var total int err := d.db.QueryRow(countQuery, args...).Scan(&total) if err != nil { return nil, 0, err } // 排序:已使用(used, pooled)排前面,其次按创建时间降序 query += " ORDER BY CASE WHEN status IN ('used', 'pooled') THEN 0 ELSE 1 END, created_at DESC LIMIT ? OFFSET ?" args = append(args, limit, offset) rows, err := d.db.Query(query, args...) if err != nil { return nil, 0, err } defer rows.Close() var owners []TeamOwner for rows.Next() { var owner TeamOwner var lastCheckedAt sql.NullTime err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt) if err != nil { continue } if lastCheckedAt.Valid { owner.LastCheckedAt = &lastCheckedAt.Time } owners = append(owners, owner) } return owners, total, nil } // GetPendingOwners 获取待处理(排除已使用和处理中的) func (d *DB) GetPendingOwners() ([]TeamOwner, error) { rows, err := d.db.Query(` SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE status = 'valid' ORDER BY created_at ASC `) if err != nil { return nil, err } defer rows.Close() var owners []TeamOwner for rows.Next() { var owner TeamOwner var lastCheckedAt sql.NullTime err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt) if err != nil { continue } if lastCheckedAt.Valid { owner.LastCheckedAt = &lastCheckedAt.Time } owners = append(owners, owner) } return owners, nil } // GetOwnersForBanCheck 获取需要检查封禁状态的母号 func (d *DB) GetOwnersForBanCheck(checkIntervalHours int) ([]TeamOwner, error) { // 获取 valid 状态且 超过检查间隔 或 从未检查过 的母号 rows, err := d.db.Query(` SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE status = 'valid' AND (last_checked_at IS NULL OR last_checked_at < datetime('now', ?)) ORDER BY last_checked_at ASC NULLS FIRST, created_at ASC `, fmt.Sprintf("-%d hours", checkIntervalHours)) if err != nil { return nil, err } defer rows.Close() var owners []TeamOwner for rows.Next() { var owner TeamOwner var lastCheckedAt sql.NullTime err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt) if err != nil { continue } if lastCheckedAt.Valid { owner.LastCheckedAt = &lastCheckedAt.Time } owners = append(owners, owner) } return owners, nil } // UpdateOwnerLastCheckedAt 更新母号的最后检查时间 func (d *DB) UpdateOwnerLastCheckedAt(id int64) error { _, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE id = ?", id) return err } // UpdateOwnerLastCheckedAtByEmail 通过邮箱更新母号的最后检查时间 func (d *DB) UpdateOwnerLastCheckedAtByEmail(email string) error { _, err := d.db.Exec("UPDATE team_owners SET last_checked_at = CURRENT_TIMESTAMP WHERE email = ?", email) return err } // MarkOwnerAsUsed 标记 Owner 为已使用 func (d *DB) MarkOwnerAsUsed(email string) error { _, err := d.db.Exec("UPDATE team_owners SET status = 'used' WHERE email = ?", email) return err } // MarkOwnerAsProcessing 标记 Owner 为处理中 func (d *DB) MarkOwnerAsProcessing(email string) error { _, err := d.db.Exec("UPDATE team_owners SET status = 'processing' WHERE email = ?", email) return err } // MarkOwnerAsFailed 标记 Owner 为失败(可重试) func (d *DB) MarkOwnerAsFailed(email string) error { _, err := d.db.Exec("UPDATE team_owners SET status = 'valid' WHERE email = ?", email) return err } // MarkOwnerAsInvalid 标记 Owner 为无效(Team 被封禁,永久跳过) func (d *DB) MarkOwnerAsInvalid(email string) error { _, err := d.db.Exec("UPDATE team_owners SET status = 'invalid' WHERE email = ?", email) return err } // UpdateOwnerStatus 更新状态 func (d *DB) UpdateOwnerStatus(id int64, status string) error { _, err := d.db.Exec("UPDATE team_owners SET status = ? WHERE id = ?", status, id) return err } // DeleteTeamOwner 删除 func (d *DB) DeleteTeamOwner(id int64) error { _, err := d.db.Exec("DELETE FROM team_owners WHERE id = ?", id) return err } // ClearTeamOwners 清空 func (d *DB) ClearTeamOwners() error { _, err := d.db.Exec("DELETE FROM team_owners") return err } // ClearUsedOwners 清理已使用的母号(status = 'used' 或 'pooled') func (d *DB) ClearUsedOwners() (int64, error) { result, err := d.db.Exec("DELETE FROM team_owners WHERE status IN ('used', 'pooled')") if err != nil { return 0, err } return result.RowsAffected() } // GetOwnersWithoutAccountID 获取缺少 account_id 的 owners func (d *DB) GetOwnersWithoutAccountID() ([]TeamOwner, error) { rows, err := d.db.Query(` SELECT id, email, password, token, account_id, status, created_at, last_checked_at FROM team_owners WHERE account_id = '' OR account_id IS NULL ORDER BY created_at DESC `) if err != nil { return nil, err } defer rows.Close() var owners []TeamOwner for rows.Next() { var owner TeamOwner var lastCheckedAt sql.NullTime err := rows.Scan(&owner.ID, &owner.Email, &owner.Password, &owner.Token, &owner.AccountID, &owner.Status, &owner.CreatedAt, &lastCheckedAt) if err != nil { continue } if lastCheckedAt.Valid { owner.LastCheckedAt = &lastCheckedAt.Time } owners = append(owners, owner) } return owners, nil } // UpdateOwnerAccountID 更新 owner 的 account_id func (d *DB) UpdateOwnerAccountID(id int64, accountID string) error { _, err := d.db.Exec("UPDATE team_owners SET account_id = ? WHERE id = ?", accountID, id) return err } // GetOwnerStats 获取统计 func (d *DB) GetOwnerStats() map[string]int { stats := map[string]int{ "total": 0, "valid": 0, "processing": 0, "used": 0, "invalid": 0, } var count int if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners").Scan(&count); err == nil { stats["total"] = count } if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'valid'").Scan(&count); err == nil { stats["valid"] = count } if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'processing'").Scan(&count); err == nil { stats["processing"] = count } if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'used'").Scan(&count); err == nil { stats["used"] = count } if err := d.db.QueryRow("SELECT COUNT(*) FROM team_owners WHERE status = 'invalid'").Scan(&count); err == nil { stats["invalid"] = count } return stats } // BatchRun 批次运行记录 type BatchRun struct { ID int64 `json:"id"` StartedAt time.Time `json:"started_at"` FinishedAt time.Time `json:"finished_at,omitempty"` TotalOwners int `json:"total_owners"` TotalRegistered int `json:"total_registered"` TotalAddedToS2A int `json:"total_added_to_s2a"` SuccessRate float64 `json:"success_rate"` DurationSeconds int `json:"duration_seconds"` Status string `json:"status"` Errors string `json:"errors,omitempty"` } // CreateBatchRun 创建批次记录 func (d *DB) CreateBatchRun(totalOwners int) (int64, error) { result, err := d.db.Exec( `INSERT INTO batch_runs (started_at, total_owners, status) VALUES (?, ?, 'running')`, time.Now(), totalOwners, ) if err != nil { return 0, err } return result.LastInsertId() } // UpdateBatchRun 更新批次记录 func (d *DB) UpdateBatchRun(id int64, registered, addedToS2A int, errors string) error { finishedAt := time.Now() // 获取开始时间计算耗时 var startedAt time.Time var totalOwners int d.db.QueryRow("SELECT started_at, total_owners FROM batch_runs WHERE id = ?", id).Scan(&startedAt, &totalOwners) duration := int(finishedAt.Sub(startedAt).Seconds()) // 计算成功率(以入库数为准) var successRate float64 expectedTotal := totalOwners * 4 // 每个 owner 应产生 4 个成员 if expectedTotal > 0 { successRate = float64(addedToS2A) / float64(expectedTotal) * 100 } _, err := d.db.Exec( `UPDATE batch_runs SET finished_at = ?, total_registered = ?, total_added_to_s2a = ?, success_rate = ?, duration_seconds = ?, status = 'completed', errors = ? WHERE id = ?`, finishedAt, registered, addedToS2A, successRate, duration, errors, id, ) return err } // GetBatchRuns 获取批次记录列表 func (d *DB) GetBatchRuns(limit int) ([]BatchRun, error) { if limit <= 0 { limit = 50 } rows, err := d.db.Query(` SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a, success_rate, duration_seconds, status, COALESCE(errors, '') FROM batch_runs ORDER BY started_at DESC LIMIT ? `, limit) if err != nil { return nil, err } defer rows.Close() var runs []BatchRun for rows.Next() { var run BatchRun var finishedAt sql.NullTime err := rows.Scan( &run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered, &run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors, ) if err != nil { continue } if finishedAt.Valid { run.FinishedAt = finishedAt.Time } runs = append(runs, run) } return runs, nil } // CleanupStuckBatchRuns 清理卡住的 running 状态批次记录 func (d *DB) CleanupStuckBatchRuns() (int64, error) { result, err := d.db.Exec(` UPDATE batch_runs SET status = 'completed', finished_at = COALESCE(finished_at, started_at), duration_seconds = 0 WHERE status = 'running' `) if err != nil { return 0, err } return result.RowsAffected() } // GetBatchRunStats 获取批次统计 func (d *DB) GetBatchRunStats() map[string]interface{} { stats := make(map[string]interface{}) var totalAdded, todayAdded int var avgRate float64 // 总入库数 d.db.QueryRow("SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs").Scan(&totalAdded) stats["total_added"] = totalAdded // 今日入库数 d.db.QueryRow(` SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs WHERE DATE(started_at) = DATE('now', 'localtime') `).Scan(&todayAdded) stats["today_added"] = todayAdded // 平均成功率 d.db.QueryRow("SELECT COALESCE(AVG(success_rate), 0) FROM batch_runs WHERE status = 'completed'").Scan(&avgRate) stats["avg_success_rate"] = avgRate // 本周入库数 var weekAdded int d.db.QueryRow(` SELECT COALESCE(SUM(total_added_to_s2a), 0) FROM batch_runs WHERE started_at >= datetime('now', '-7 days') `).Scan(&weekAdded) stats["week_added"] = weekAdded return stats } // Close 关闭数据库 func (d *DB) Close() error { if d.db != nil { return d.db.Close() } return nil }