feat: Add batch processing and upload functionality, including new backend APIs, logging system, SQLite database, and dedicated frontend pages.
This commit is contained in:
@@ -394,6 +394,8 @@ func checkSingleOwnerBan(owner database.TeamOwner, proxy string) BanCheckResult
|
||||
// 账户被封禁
|
||||
logger.Warning(fmt.Sprintf("母号被封禁: %s - %s", owner.Email, accountStatus.Error), owner.Email, "ban-check")
|
||||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("母号被封禁已删除: %s", owner.Email), owner.Email, "ban-check")
|
||||
result.Status = "banned"
|
||||
result.Message = accountStatus.Error
|
||||
|
||||
@@ -401,6 +403,8 @@ func checkSingleOwnerBan(owner database.TeamOwner, proxy string) BanCheckResult
|
||||
// Token 过期
|
||||
logger.Warning(fmt.Sprintf("母号 Token 过期: %s", owner.Email), owner.Email, "ban-check")
|
||||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("母号Token过期已删除: %s", owner.Email), owner.Email, "ban-check")
|
||||
result.Status = "banned"
|
||||
result.Message = "Token 已过期"
|
||||
|
||||
|
||||
@@ -177,6 +177,85 @@ func HandleTeamProcessStop(w http.ResponseWriter, r *http.Request) {
|
||||
Success(w, map[string]string{"message": "已发送停止信号"})
|
||||
}
|
||||
|
||||
// HandleBatchHistory GET /api/batch/history - 获取历史批次(分页)
|
||||
func HandleBatchHistory(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
|
||||
return
|
||||
}
|
||||
|
||||
if database.Instance == nil {
|
||||
Error(w, http.StatusInternalServerError, "数据库未初始化")
|
||||
return
|
||||
}
|
||||
|
||||
// 获取分页参数
|
||||
page := 1
|
||||
pageSize := 5
|
||||
if p := r.URL.Query().Get("page"); p != "" {
|
||||
if v, err := fmt.Sscanf(p, "%d", &page); err == nil && v > 0 {
|
||||
// page已设置
|
||||
}
|
||||
}
|
||||
if ps := r.URL.Query().Get("page_size"); ps != "" {
|
||||
if v, err := fmt.Sscanf(ps, "%d", &pageSize); err == nil && v > 0 {
|
||||
// pageSize已设置
|
||||
}
|
||||
}
|
||||
|
||||
runs, total, err := database.Instance.GetBatchRunsWithPagination(page, pageSize)
|
||||
if err != nil {
|
||||
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
totalPages := (total + pageSize - 1) / pageSize
|
||||
|
||||
Success(w, map[string]interface{}{
|
||||
"runs": runs,
|
||||
"total": total,
|
||||
"page": page,
|
||||
"page_size": pageSize,
|
||||
"total_pages": totalPages,
|
||||
})
|
||||
}
|
||||
|
||||
// HandleBatchDetail GET /api/batch/detail?id=xxx - 获取批次详情(包含Team结果)
|
||||
func HandleBatchDetail(w http.ResponseWriter, r *http.Request) {
|
||||
if r.Method != http.MethodGet {
|
||||
Error(w, http.StatusMethodNotAllowed, "仅支持 GET")
|
||||
return
|
||||
}
|
||||
|
||||
if database.Instance == nil {
|
||||
Error(w, http.StatusInternalServerError, "数据库未初始化")
|
||||
return
|
||||
}
|
||||
|
||||
idStr := r.URL.Query().Get("id")
|
||||
if idStr == "" {
|
||||
Error(w, http.StatusBadRequest, "缺少批次ID")
|
||||
return
|
||||
}
|
||||
|
||||
var batchID int64
|
||||
if _, err := fmt.Sscanf(idStr, "%d", &batchID); err != nil {
|
||||
Error(w, http.StatusBadRequest, "无效的批次ID")
|
||||
return
|
||||
}
|
||||
|
||||
run, results, err := database.Instance.GetBatchRunWithResults(batchID)
|
||||
if err != nil {
|
||||
Error(w, http.StatusInternalServerError, fmt.Sprintf("查询失败: %v", err))
|
||||
return
|
||||
}
|
||||
|
||||
Success(w, map[string]interface{}{
|
||||
"batch": run,
|
||||
"results": results,
|
||||
})
|
||||
}
|
||||
|
||||
// runTeamProcess 执行 Team 批量处理 - 使用工作池模式
|
||||
func runTeamProcess(req TeamProcessRequest) {
|
||||
totalOwners := len(req.Owners)
|
||||
@@ -272,6 +351,23 @@ func runTeamProcess(req TeamProcessRequest) {
|
||||
totalRegistered += result.Registered
|
||||
totalAddedToS2A += result.AddedToS2A
|
||||
allErrors = append(allErrors, result.Errors...)
|
||||
|
||||
// 保存单个Team结果到数据库
|
||||
if database.Instance != nil && batchID > 0 {
|
||||
dbResult := database.BatchTeamResult{
|
||||
TeamIndex: result.TeamIndex,
|
||||
OwnerEmail: result.OwnerEmail,
|
||||
TeamID: result.TeamID,
|
||||
Registered: result.Registered,
|
||||
AddedToS2A: result.AddedToS2A,
|
||||
MemberEmails: result.MemberEmails,
|
||||
Errors: result.Errors,
|
||||
DurationMs: result.DurationMs,
|
||||
}
|
||||
if err := database.Instance.SaveBatchTeamResult(batchID, dbResult); err != nil {
|
||||
logger.Error(fmt.Sprintf("保存Team结果失败: %v", err), result.OwnerEmail, "team")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 计算成功率
|
||||
@@ -324,6 +420,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
||||
if database.Instance != nil {
|
||||
if success {
|
||||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("%s 母号已使用并删除: %s", logPrefix, owner.Email), owner.Email, "team")
|
||||
} else {
|
||||
// 失败时恢复为 valid,允许重试
|
||||
database.Instance.MarkOwnerAsFailed(owner.Email)
|
||||
@@ -356,6 +454,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
||||
logger.Warning(fmt.Sprintf("%s Token 无效或过期,标记为无效", logPrefix), owner.Email, "team")
|
||||
if database.Instance != nil {
|
||||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("%s 母号无效已删除: %s", logPrefix, owner.Email), owner.Email, "team")
|
||||
}
|
||||
return result
|
||||
}
|
||||
@@ -377,6 +477,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
||||
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用: %v", logPrefix, err), owner.Email, "team")
|
||||
if database.Instance != nil {
|
||||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("%s 母号已使用并删除(邀请已满): %s", logPrefix, owner.Email), owner.Email, "team")
|
||||
}
|
||||
result.Errors = append(result.Errors, "Team 邀请已满")
|
||||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||||
@@ -390,6 +492,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
||||
logger.Error(fmt.Sprintf("%s Team 被封禁,标记为无效: %v", logPrefix, err), owner.Email, "team")
|
||||
if database.Instance != nil {
|
||||
database.Instance.MarkOwnerAsInvalid(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("%s 母号无效已删除(Team被封禁): %s", logPrefix, owner.Email), owner.Email, "team")
|
||||
}
|
||||
result.Errors = append(result.Errors, "Team 被封禁")
|
||||
result.DurationMs = time.Since(startTime).Milliseconds()
|
||||
@@ -426,6 +530,8 @@ func processSingleTeam(idx int, req TeamProcessRequest) (result TeamProcessResul
|
||||
logger.Warning(fmt.Sprintf("%s Team 邀请已满,标记母号为已使用,停止后续处理", logPrefix), owner.Email, "team")
|
||||
if database.Instance != nil {
|
||||
database.Instance.MarkOwnerAsUsed(owner.Email)
|
||||
database.Instance.DeleteTeamOwnerByEmail(owner.Email)
|
||||
logger.Info(fmt.Sprintf("%s 母号已使用并删除(Team耗尽): %s", logPrefix, owner.Email), owner.Email, "team")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package database
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@@ -104,8 +105,26 @@ func (d *DB) createTables() error {
|
||||
status TEXT DEFAULT 'running',
|
||||
errors TEXT
|
||||
);
|
||||
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_runs_started_at ON batch_runs(started_at);
|
||||
|
||||
-- 批次Team处理结果表
|
||||
CREATE TABLE IF NOT EXISTS batch_team_results (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
batch_id INTEGER NOT NULL,
|
||||
team_index INTEGER NOT NULL,
|
||||
owner_email TEXT NOT NULL,
|
||||
team_id TEXT,
|
||||
registered INTEGER DEFAULT 0,
|
||||
added_to_s2a INTEGER DEFAULT 0,
|
||||
member_emails TEXT,
|
||||
errors TEXT,
|
||||
duration_ms INTEGER DEFAULT 0,
|
||||
created_at DATETIME DEFAULT CURRENT_TIMESTAMP,
|
||||
FOREIGN KEY (batch_id) REFERENCES batch_runs(id)
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_batch_team_results_batch_id ON batch_team_results(batch_id);
|
||||
`)
|
||||
return err
|
||||
}
|
||||
@@ -346,6 +365,15 @@ func (d *DB) DeleteTeamOwner(id int64) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// DeleteTeamOwnerByEmail 按邮箱删除母号
|
||||
func (d *DB) DeleteTeamOwnerByEmail(email string) (int64, error) {
|
||||
result, err := d.db.Exec("DELETE FROM team_owners WHERE email = ?", email)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return result.RowsAffected()
|
||||
}
|
||||
|
||||
// ClearTeamOwners 清空
|
||||
func (d *DB) ClearTeamOwners() error {
|
||||
_, err := d.db.Exec("DELETE FROM team_owners")
|
||||
@@ -569,6 +597,146 @@ func (d *DB) GetBatchRunStats() map[string]interface{} {
|
||||
return stats
|
||||
}
|
||||
|
||||
// BatchTeamResult 批次Team处理结果
|
||||
type BatchTeamResult struct {
|
||||
ID int64 `json:"id"`
|
||||
BatchID int64 `json:"batch_id"`
|
||||
TeamIndex int `json:"team_index"`
|
||||
OwnerEmail string `json:"owner_email"`
|
||||
TeamID string `json:"team_id"`
|
||||
Registered int `json:"registered"`
|
||||
AddedToS2A int `json:"added_to_s2a"`
|
||||
MemberEmails []string `json:"member_emails"`
|
||||
Errors []string `json:"errors"`
|
||||
DurationMs int64 `json:"duration_ms"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// SaveBatchTeamResult 保存单个Team处理结果
|
||||
func (d *DB) SaveBatchTeamResult(batchID int64, result BatchTeamResult) error {
|
||||
memberEmailsJSON, _ := json.Marshal(result.MemberEmails)
|
||||
errorsJSON, _ := json.Marshal(result.Errors)
|
||||
|
||||
_, err := d.db.Exec(`
|
||||
INSERT INTO batch_team_results (batch_id, team_index, owner_email, team_id, registered, added_to_s2a, member_emails, errors, duration_ms)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
`, batchID, result.TeamIndex, result.OwnerEmail, result.TeamID, result.Registered, result.AddedToS2A, string(memberEmailsJSON), string(errorsJSON), result.DurationMs)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetBatchTeamResults 获取某批次的所有Team结果
|
||||
func (d *DB) GetBatchTeamResults(batchID int64) ([]BatchTeamResult, error) {
|
||||
rows, err := d.db.Query(`
|
||||
SELECT id, batch_id, team_index, owner_email, COALESCE(team_id, ''), registered, added_to_s2a,
|
||||
COALESCE(member_emails, '[]'), COALESCE(errors, '[]'), duration_ms, created_at
|
||||
FROM batch_team_results
|
||||
WHERE batch_id = ?
|
||||
ORDER BY team_index ASC
|
||||
`, batchID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var results []BatchTeamResult
|
||||
for rows.Next() {
|
||||
var r BatchTeamResult
|
||||
var memberEmailsJSON, errorsJSON string
|
||||
err := rows.Scan(&r.ID, &r.BatchID, &r.TeamIndex, &r.OwnerEmail, &r.TeamID, &r.Registered,
|
||||
&r.AddedToS2A, &memberEmailsJSON, &errorsJSON, &r.DurationMs, &r.CreatedAt)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
json.Unmarshal([]byte(memberEmailsJSON), &r.MemberEmails)
|
||||
json.Unmarshal([]byte(errorsJSON), &r.Errors)
|
||||
if r.MemberEmails == nil {
|
||||
r.MemberEmails = []string{}
|
||||
}
|
||||
if r.Errors == nil {
|
||||
r.Errors = []string{}
|
||||
}
|
||||
results = append(results, r)
|
||||
}
|
||||
return results, nil
|
||||
}
|
||||
|
||||
// GetBatchRunsWithPagination 分页获取批次记录
|
||||
func (d *DB) GetBatchRunsWithPagination(page, pageSize int) ([]BatchRun, int, error) {
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if pageSize <= 0 {
|
||||
pageSize = 5
|
||||
}
|
||||
|
||||
// 获取总数
|
||||
var total int
|
||||
err := d.db.QueryRow("SELECT COUNT(*) FROM batch_runs").Scan(&total)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
|
||||
offset := (page - 1) * pageSize
|
||||
rows, err := d.db.Query(`
|
||||
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
|
||||
success_rate, duration_seconds, status, COALESCE(errors, '')
|
||||
FROM batch_runs
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ? OFFSET ?
|
||||
`, pageSize, offset)
|
||||
if err != nil {
|
||||
return nil, 0, err
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var runs []BatchRun
|
||||
for rows.Next() {
|
||||
var run BatchRun
|
||||
var finishedAt sql.NullTime
|
||||
err := rows.Scan(
|
||||
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
|
||||
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
|
||||
)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
run.FinishedAt = finishedAt.Time
|
||||
}
|
||||
runs = append(runs, run)
|
||||
}
|
||||
return runs, total, nil
|
||||
}
|
||||
|
||||
// GetBatchRunWithResults 获取批次详情(包含Team结果)
|
||||
func (d *DB) GetBatchRunWithResults(batchID int64) (*BatchRun, []BatchTeamResult, error) {
|
||||
// 获取批次信息
|
||||
var run BatchRun
|
||||
var finishedAt sql.NullTime
|
||||
err := d.db.QueryRow(`
|
||||
SELECT id, started_at, finished_at, total_owners, total_registered, total_added_to_s2a,
|
||||
success_rate, duration_seconds, status, COALESCE(errors, '')
|
||||
FROM batch_runs WHERE id = ?
|
||||
`, batchID).Scan(
|
||||
&run.ID, &run.StartedAt, &finishedAt, &run.TotalOwners, &run.TotalRegistered,
|
||||
&run.TotalAddedToS2A, &run.SuccessRate, &run.DurationSeconds, &run.Status, &run.Errors,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
if finishedAt.Valid {
|
||||
run.FinishedAt = finishedAt.Time
|
||||
}
|
||||
|
||||
// 获取Team结果
|
||||
results, err := d.GetBatchTeamResults(batchID)
|
||||
if err != nil {
|
||||
return &run, nil, err
|
||||
}
|
||||
|
||||
return &run, results, nil
|
||||
}
|
||||
|
||||
// Close 关闭数据库
|
||||
func (d *DB) Close() error {
|
||||
if d.db != nil {
|
||||
|
||||
@@ -203,6 +203,39 @@ func GetLogs(limit int) []LogEntry {
|
||||
return result
|
||||
}
|
||||
|
||||
// GetLogsByModule 按模块筛选日志并分页(最新的在前)
|
||||
func GetLogsByModule(module string, page, pageSize int) ([]LogEntry, int) {
|
||||
logsMu.RLock()
|
||||
defer logsMu.RUnlock()
|
||||
|
||||
// 倒序收集匹配的日志
|
||||
var filtered []LogEntry
|
||||
for i := len(logs) - 1; i >= 0; i-- {
|
||||
if logs[i].Module == module {
|
||||
filtered = append(filtered, logs[i])
|
||||
}
|
||||
}
|
||||
|
||||
total := len(filtered)
|
||||
if page < 1 {
|
||||
page = 1
|
||||
}
|
||||
if pageSize <= 0 {
|
||||
pageSize = 5
|
||||
}
|
||||
|
||||
start := (page - 1) * pageSize
|
||||
if start >= total {
|
||||
return []LogEntry{}, total
|
||||
}
|
||||
end := start + pageSize
|
||||
if end > total {
|
||||
end = total
|
||||
}
|
||||
|
||||
return filtered[start:end], total
|
||||
}
|
||||
|
||||
// ClearLogs 清空日志
|
||||
func ClearLogs() {
|
||||
logsMu.Lock()
|
||||
|
||||
Reference in New Issue
Block a user