364 lines
11 KiB
Python
364 lines
11 KiB
Python
# ==================== 工具函数模块 ====================
|
||
# 通用工具函数: CSV 记录、JSON 追踪等
|
||
|
||
import os
|
||
import csv
|
||
import json
|
||
import time
|
||
from datetime import datetime
|
||
|
||
from config import CSV_FILE, TEAM_TRACKER_FILE
|
||
from logger import log
|
||
|
||
|
||
def save_to_csv(email: str, password: str, team_name: str = "", status: str = "success", crs_id: str = ""):
|
||
"""保存账号信息到 CSV 文件
|
||
|
||
Args:
|
||
email: 邮箱地址
|
||
password: 密码
|
||
team_name: Team 名称
|
||
status: 状态 (success/failed)
|
||
crs_id: CRS 账号 ID
|
||
"""
|
||
file_exists = os.path.exists(CSV_FILE)
|
||
|
||
with open(CSV_FILE, 'a', newline='', encoding='utf-8') as f:
|
||
writer = csv.writer(f)
|
||
|
||
if not file_exists:
|
||
writer.writerow(['email', 'password', 'team', 'status', 'crs_id', 'timestamp'])
|
||
|
||
writer.writerow([
|
||
email,
|
||
password,
|
||
team_name,
|
||
status,
|
||
crs_id,
|
||
datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
])
|
||
|
||
log.info(f"保存到 {CSV_FILE}", icon="save")
|
||
|
||
|
||
def load_team_tracker() -> dict:
|
||
"""加载 Team 追踪记录
|
||
|
||
Returns:
|
||
dict: {"teams": {"team_name": [{"email": "...", "status": "..."}]}}
|
||
"""
|
||
if os.path.exists(TEAM_TRACKER_FILE):
|
||
try:
|
||
with open(TEAM_TRACKER_FILE, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
except Exception as e:
|
||
log.warning(f"加载追踪记录失败: {e}")
|
||
|
||
return {"teams": {}, "last_updated": None}
|
||
|
||
|
||
def save_team_tracker(tracker: dict):
|
||
"""保存 Team 追踪记录"""
|
||
tracker["last_updated"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
|
||
try:
|
||
with open(TEAM_TRACKER_FILE, 'w', encoding='utf-8') as f:
|
||
json.dump(tracker, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
log.warning(f"保存追踪记录失败: {e}")
|
||
|
||
|
||
def add_account_to_tracker(tracker: dict, team_name: str, email: str, status: str = "invited"):
|
||
"""添加账号到追踪记录
|
||
|
||
Args:
|
||
tracker: 追踪记录
|
||
team_name: Team 名称
|
||
email: 邮箱地址
|
||
status: 状态 (invited/registered/authorized/completed)
|
||
"""
|
||
if team_name not in tracker["teams"]:
|
||
tracker["teams"][team_name] = []
|
||
|
||
# 检查是否已存在
|
||
for account in tracker["teams"][team_name]:
|
||
if account["email"] == email:
|
||
account["status"] = status
|
||
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
return
|
||
|
||
# 添加新记录
|
||
tracker["teams"][team_name].append({
|
||
"email": email,
|
||
"status": status,
|
||
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
|
||
|
||
def update_account_status(tracker: dict, team_name: str, email: str, status: str):
|
||
"""更新账号状态"""
|
||
if team_name in tracker["teams"]:
|
||
for account in tracker["teams"][team_name]:
|
||
if account["email"] == email:
|
||
account["status"] = status
|
||
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
return
|
||
|
||
|
||
def remove_account_from_tracker(tracker: dict, team_name: str, email: str) -> bool:
|
||
"""从 tracker 中移除账号
|
||
|
||
Args:
|
||
tracker: 追踪记录
|
||
team_name: Team 名称
|
||
email: 邮箱地址
|
||
|
||
Returns:
|
||
bool: 是否成功移除
|
||
"""
|
||
if team_name in tracker["teams"]:
|
||
original_len = len(tracker["teams"][team_name])
|
||
tracker["teams"][team_name] = [
|
||
acc for acc in tracker["teams"][team_name]
|
||
if acc["email"] != email
|
||
]
|
||
return len(tracker["teams"][team_name]) < original_len
|
||
return False
|
||
|
||
|
||
def get_team_account_count(tracker: dict, team_name: str) -> int:
|
||
"""获取 Team 已记录的账号数量"""
|
||
if team_name in tracker["teams"]:
|
||
return len(tracker["teams"][team_name])
|
||
return 0
|
||
|
||
|
||
def get_incomplete_accounts(tracker: dict, team_name: str) -> list:
|
||
"""获取未完成的账号列表 (非 completed 状态)
|
||
|
||
Args:
|
||
tracker: 追踪记录
|
||
team_name: Team 名称
|
||
|
||
Returns:
|
||
list: [{"email": "...", "status": "...", "password": "...", "role": "..."}]
|
||
"""
|
||
incomplete = []
|
||
if team_name in tracker.get("teams", {}):
|
||
for account in tracker["teams"][team_name]:
|
||
status = account.get("status", "")
|
||
# 只要不是 completed 都算未完成,需要继续处理
|
||
if status != "completed":
|
||
incomplete.append({
|
||
"email": account["email"],
|
||
"status": status,
|
||
"password": account.get("password", ""),
|
||
"role": account.get("role", "member") # 包含角色信息
|
||
})
|
||
return incomplete
|
||
|
||
|
||
def get_all_incomplete_accounts(tracker: dict) -> dict:
|
||
"""获取所有 Team 的未完成账号
|
||
|
||
Returns:
|
||
dict: {"team_name": [{"email": "...", "status": "..."}]}
|
||
"""
|
||
result = {}
|
||
for team_name in tracker.get("teams", {}):
|
||
incomplete = get_incomplete_accounts(tracker, team_name)
|
||
if incomplete:
|
||
result[team_name] = incomplete
|
||
return result
|
||
|
||
|
||
def add_account_with_password(tracker: dict, team_name: str, email: str, password: str, status: str = "invited"):
|
||
"""添加账号到追踪记录 (带密码)"""
|
||
if team_name not in tracker["teams"]:
|
||
tracker["teams"][team_name] = []
|
||
|
||
# 检查是否已存在
|
||
for account in tracker["teams"][team_name]:
|
||
if account["email"] == email:
|
||
account["status"] = status
|
||
account["password"] = password
|
||
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
return
|
||
|
||
# 添加新记录
|
||
tracker["teams"][team_name].append({
|
||
"email": email,
|
||
"password": password,
|
||
"status": status,
|
||
"role": "member", # 角色: owner 或 member
|
||
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
|
||
|
||
def print_summary(results: list):
|
||
"""打印执行摘要
|
||
|
||
Args:
|
||
results: [{"team": "...", "email": "...", "status": "...", "crs_id": "..."}]
|
||
"""
|
||
log.separator("=", 60)
|
||
log.header("执行摘要")
|
||
log.separator("=", 60)
|
||
|
||
success_count = sum(1 for r in results if r.get("status") == "success")
|
||
failed_count = len(results) - success_count
|
||
|
||
log.info(f"总计: {len(results)} 个账号")
|
||
log.success(f"成功: {success_count}")
|
||
log.error(f"失败: {failed_count}")
|
||
|
||
# 按 Team 分组
|
||
teams = {}
|
||
for r in results:
|
||
team = r.get("team", "Unknown")
|
||
if team not in teams:
|
||
teams[team] = {"success": 0, "failed": 0, "accounts": []}
|
||
|
||
if r.get("status") == "success":
|
||
teams[team]["success"] += 1
|
||
else:
|
||
teams[team]["failed"] += 1
|
||
|
||
teams[team]["accounts"].append(r)
|
||
|
||
log.info("按 Team 统计:")
|
||
for team_name, data in teams.items():
|
||
log.info(f"{team_name}: 成功 {data['success']}, 失败 {data['failed']}", icon="team")
|
||
for acc in data["accounts"]:
|
||
if acc.get("status") == "success":
|
||
log.success(f"{acc.get('email', 'Unknown')}")
|
||
else:
|
||
log.error(f"{acc.get('email', 'Unknown')}")
|
||
|
||
log.separator("=", 60)
|
||
|
||
|
||
def format_duration(seconds: float) -> str:
|
||
"""格式化时长"""
|
||
if seconds < 60:
|
||
return f"{seconds:.1f}s"
|
||
elif seconds < 3600:
|
||
minutes = seconds / 60
|
||
return f"{minutes:.1f}m"
|
||
else:
|
||
hours = seconds / 3600
|
||
return f"{hours:.1f}h"
|
||
|
||
|
||
class Timer:
|
||
"""计时器"""
|
||
|
||
def __init__(self, name: str = ""):
|
||
self.name = name
|
||
self.start_time = None
|
||
self.end_time = None
|
||
|
||
def start(self):
|
||
self.start_time = time.time()
|
||
if self.name:
|
||
log.info(f"{self.name} 开始", icon="time")
|
||
|
||
def stop(self):
|
||
self.end_time = time.time()
|
||
duration = self.end_time - self.start_time
|
||
if self.name:
|
||
log.info(f"{self.name} 完成 ({format_duration(duration)})", icon="time")
|
||
return duration
|
||
|
||
def __enter__(self):
|
||
self.start()
|
||
return self
|
||
|
||
def __exit__(self, *args):
|
||
self.stop()
|
||
|
||
|
||
def add_team_owners_to_tracker(tracker: dict, password: str) -> int:
|
||
"""将 team.json 中的 Team Owner 添加到 tracker,走授权流程
|
||
|
||
只添加有 token 的 Team Owner,没有 token 的跳过(格式3会在登录时单独处理)
|
||
|
||
Args:
|
||
tracker: team_tracker 数据
|
||
password: 默认账号密码 (旧格式使用)
|
||
|
||
Returns:
|
||
int: 添加的数量
|
||
"""
|
||
from config import INCLUDE_TEAM_OWNERS, TEAMS
|
||
|
||
if not INCLUDE_TEAM_OWNERS:
|
||
return 0
|
||
|
||
if not TEAMS:
|
||
return 0
|
||
|
||
added_count = 0
|
||
for team in TEAMS:
|
||
# 跳过没有 token 的 Team(格式3会在登录时单独处理)
|
||
if not team.get("auth_token"):
|
||
continue
|
||
|
||
team_name = team.get("name", "")
|
||
team_format = team.get("format", "old")
|
||
|
||
# 获取邮箱
|
||
email = team.get("owner_email", "")
|
||
if not email:
|
||
raw_data = team.get("raw", {})
|
||
email = raw_data.get("user", {}).get("email", "")
|
||
|
||
# 获取密码
|
||
owner_password = team.get("owner_password", "") or password
|
||
|
||
if not team_name or not email:
|
||
continue
|
||
|
||
# 检查是否已在 tracker 中
|
||
existing = False
|
||
if team_name in tracker.get("teams", {}):
|
||
for acc in tracker["teams"][team_name]:
|
||
if acc.get("email") == email:
|
||
existing = True
|
||
break
|
||
|
||
if not existing:
|
||
# 添加到 tracker
|
||
if team_name not in tracker["teams"]:
|
||
tracker["teams"][team_name] = []
|
||
|
||
# 根据格式和授权状态决定 tracker 状态
|
||
# - 新格式且已授权: 状态为 completed (跳过)
|
||
# - 新格式未授权: 状态为 registered (需要授权)
|
||
# - 旧格式: 使用 OTP 登录授权,状态为 team_owner
|
||
if team_format == "new":
|
||
if team.get("authorized"):
|
||
status = "completed" # 已授权,跳过
|
||
else:
|
||
status = "registered" # 需要授权
|
||
else:
|
||
status = "team_owner" # 旧格式,使用 OTP 登录授权
|
||
|
||
tracker["teams"][team_name].append({
|
||
"email": email,
|
||
"password": owner_password,
|
||
"status": status,
|
||
"role": "owner",
|
||
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
||
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
||
})
|
||
log.info(f"Team Owner 添加到 tracker: {email} -> {team_name} (格式: {team_format}, 状态: {status})")
|
||
added_count += 1
|
||
|
||
if added_count > 0:
|
||
log.info(f"已添加 {added_count} 个 Team Owner 到 tracker", icon="sync")
|
||
|
||
return added_count
|