Files
codexTool/utils.py
2026-01-21 22:24:36 +08:00

463 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== 工具函数模块 ====================
# 通用工具函数: CSV 记录、JSON 追踪等
import os
import csv
import json
import time
from datetime import datetime
from config import CSV_FILE, TEAM_TRACKER_FILE
from logger import log
def save_to_csv(email: str, password: str, team_name: str = "", status: str = "success", crs_id: str = ""):
"""保存账号信息到 CSV 文件
Args:
email: 邮箱地址
password: 密码
team_name: Team 名称
status: 状态 (success/failed)
crs_id: CRS 账号 ID
"""
file_exists = os.path.exists(CSV_FILE)
with open(CSV_FILE, 'a', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if not file_exists:
writer.writerow(['email', 'password', 'team', 'status', 'crs_id', 'timestamp'])
writer.writerow([
email,
password,
team_name,
status,
crs_id,
datetime.now().strftime('%Y-%m-%d %H:%M:%S')
])
log.info(f"保存到 {CSV_FILE}", icon="save")
def load_team_tracker() -> dict:
"""加载 Team 追踪记录
Returns:
dict: {"teams": {"team_name": [{"email": "...", "status": "..."}]}}
"""
if os.path.exists(TEAM_TRACKER_FILE):
try:
with open(TEAM_TRACKER_FILE, 'r', encoding='utf-8') as f:
return json.load(f)
except Exception as e:
log.warning(f"加载追踪记录失败: {e}")
return {"teams": {}, "last_updated": None}
def save_team_tracker(tracker: dict):
"""保存 Team 追踪记录"""
tracker["last_updated"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
with open(TEAM_TRACKER_FILE, 'w', encoding='utf-8') as f:
json.dump(tracker, f, ensure_ascii=False, indent=2)
except Exception as e:
log.warning(f"保存追踪记录失败: {e}")
def add_account_to_tracker(tracker: dict, team_name: str, email: str, status: str = "invited"):
"""添加账号到追踪记录
Args:
tracker: 追踪记录
team_name: Team 名称
email: 邮箱地址
status: 状态 (invited/registered/authorized/completed)
"""
if team_name not in tracker["teams"]:
tracker["teams"][team_name] = []
# 检查是否已存在
for account in tracker["teams"][team_name]:
if account["email"] == email:
account["status"] = status
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return
# 添加新记录
tracker["teams"][team_name].append({
"email": email,
"status": status,
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
def update_account_status(tracker: dict, team_name: str, email: str, status: str):
"""更新账号状态"""
if team_name in tracker["teams"]:
for account in tracker["teams"][team_name]:
if account["email"] == email:
account["status"] = status
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return
def remove_account_from_tracker(tracker: dict, team_name: str, email: str) -> bool:
"""从 tracker 中移除账号
Args:
tracker: 追踪记录
team_name: Team 名称
email: 邮箱地址
Returns:
bool: 是否成功移除
"""
if team_name in tracker["teams"]:
original_len = len(tracker["teams"][team_name])
tracker["teams"][team_name] = [
acc for acc in tracker["teams"][team_name]
if acc["email"] != email
]
return len(tracker["teams"][team_name]) < original_len
return False
def get_team_account_count(tracker: dict, team_name: str) -> int:
"""获取 Team 已记录的账号数量"""
if team_name in tracker["teams"]:
return len(tracker["teams"][team_name])
return 0
def get_incomplete_accounts(tracker: dict, team_name: str) -> list:
"""获取未完成的账号列表 (非 completed 状态)
Args:
tracker: 追踪记录
team_name: Team 名称
Returns:
list: [{"email": "...", "status": "...", "password": "...", "role": "..."}]
"""
incomplete = []
if team_name in tracker.get("teams", {}):
for account in tracker["teams"][team_name]:
status = account.get("status", "")
# 只要不是 completed 都算未完成,需要继续处理
if status != "completed":
incomplete.append({
"email": account["email"],
"status": status,
"password": account.get("password", ""),
"role": account.get("role", "member") # 包含角色信息
})
return incomplete
def get_all_incomplete_accounts(tracker: dict) -> dict:
"""获取所有 Team 的未完成账号
Returns:
dict: {"team_name": [{"email": "...", "status": "..."}]}
"""
result = {}
for team_name in tracker.get("teams", {}):
incomplete = get_incomplete_accounts(tracker, team_name)
if incomplete:
result[team_name] = incomplete
return result
def add_account_with_password(tracker: dict, team_name: str, email: str, password: str, status: str = "invited"):
"""添加账号到追踪记录 (带密码)"""
if team_name not in tracker["teams"]:
tracker["teams"][team_name] = []
# 检查是否已存在
for account in tracker["teams"][team_name]:
if account["email"] == email:
account["status"] = status
account["password"] = password
account["updated_at"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
return
# 添加新记录
tracker["teams"][team_name].append({
"email": email,
"password": password,
"status": status,
"role": "member", # 角色: owner 或 member
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
def print_summary(results: list):
"""打印执行摘要
Args:
results: [{"team": "...", "email": "...", "status": "...", "crs_id": "..."}]
"""
# 去重 (按 email 去重,保留最后一条记录)
seen = {}
for r in results:
email = r.get("email", "")
if email:
seen[email] = r
results = list(seen.values())
log.separator("=", 60)
log.header("执行摘要")
log.separator("=", 60)
success_count = sum(1 for r in results if r.get("status") == "success")
failed_count = len(results) - success_count
log.info(f"总计: {len(results)} 个账号")
log.success(f"成功: {success_count}")
if failed_count > 0:
log.error(f"失败: {failed_count}")
# 按 Team 分组
teams = {}
for r in results:
team = r.get("team", "Unknown")
if team not in teams:
teams[team] = {"success": [], "failed": []}
if r.get("status") == "success":
teams[team]["success"].append(r)
else:
teams[team]["failed"].append(r)
if teams:
log.info("按 Team 统计:")
for team_name, data in teams.items():
success_list = data["success"]
failed_list = data["failed"]
log.info(f" {team_name}: 成功 {len(success_list)}, 失败 {len(failed_list)}")
# 只显示失败的账号详情
for acc in failed_list:
log.error(f"{acc.get('email', 'Unknown')}")
log.separator("=", 60)
def format_duration(seconds: float) -> str:
"""格式化时长"""
if seconds < 60:
return f"{seconds:.1f}s"
elif seconds < 3600:
minutes = seconds / 60
return f"{minutes:.1f}m"
else:
hours = seconds / 3600
return f"{hours:.1f}h"
class Timer:
"""计时器"""
def __init__(self, name: str = ""):
self.name = name
self.start_time = None
self.end_time = None
def start(self):
self.start_time = time.time()
if self.name:
log.info(f"{self.name} 开始", icon="time")
def stop(self):
self.end_time = time.time()
duration = self.end_time - self.start_time
if self.name:
log.info(f"{self.name} 完成 ({format_duration(duration)})", icon="time")
return duration
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.stop()
def add_team_owners_to_tracker(tracker: dict, password: str) -> int:
"""将 team.json 中的 Team Owner 添加到 tracker走授权流程
只添加有 token 的 Team Owner没有 token 的跳过格式3会在登录时单独处理
Args:
tracker: team_tracker 数据
password: 默认账号密码 (旧格式使用)
Returns:
int: 添加的数量
"""
from config import INCLUDE_TEAM_OWNERS, TEAMS
if not INCLUDE_TEAM_OWNERS:
return 0
if not TEAMS:
return 0
added_count = 0
for team in TEAMS:
# 跳过没有 token 的 Team格式3会在登录时单独处理
if not team.get("auth_token"):
continue
team_name = team.get("name", "")
team_format = team.get("format", "old")
# 获取邮箱
email = team.get("owner_email", "")
if not email:
raw_data = team.get("raw", {})
email = raw_data.get("user", {}).get("email", "")
# 获取密码
owner_password = team.get("owner_password", "") or password
if not team_name or not email:
continue
# 检查是否已在 tracker 中
existing = False
if team_name in tracker.get("teams", {}):
for acc in tracker["teams"][team_name]:
if acc.get("email") == email:
existing = True
break
if not existing:
# 添加到 tracker
if team_name not in tracker["teams"]:
tracker["teams"][team_name] = []
# 根据格式和授权状态决定 tracker 状态
# - 新格式且已授权: 状态为 completed (跳过)
# - 新格式未授权: 状态为 registered (需要授权)
# - 旧格式: 使用 OTP 登录授权,状态为 team_owner
if team_format == "new":
if team.get("authorized"):
status = "completed" # 已授权,跳过
else:
status = "registered" # 需要授权
else:
status = "team_owner" # 旧格式,使用 OTP 登录授权
tracker["teams"][team_name].append({
"email": email,
"password": owner_password,
"status": status,
"role": "owner",
"created_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"updated_at": datetime.now().strftime('%Y-%m-%d %H:%M:%S')
})
log.info(f"Team Owner 添加到 tracker: {email} -> {team_name} (格式: {team_format}, 状态: {status})")
added_count += 1
if added_count > 0:
log.info(f"已添加 {added_count} 个 Team Owner 到 tracker", icon="sync")
return added_count
def get_completed_teams(tracker: dict) -> list:
"""获取所有处理成功的 Team 列表
处理成功的定义:所有账号状态都是 completed且没有失败的账号
Args:
tracker: team_tracker 数据
Returns:
list: [{"name": "team_name", "total": 4, "completed": 4, "failed": 0, "pending": 0}]
"""
completed_teams = []
teams_data = tracker.get("teams", {})
for team_name, accounts in teams_data.items():
total = len(accounts)
if total == 0:
continue
completed = sum(1 for a in accounts if a.get("status") == "completed")
failed = sum(1 for a in accounts if "fail" in a.get("status", "").lower())
pending = total - completed - failed
# 全部完成且没有失败的才算成功
if completed == total and failed == 0:
completed_teams.append({
"name": team_name,
"total": total,
"completed": completed,
"failed": failed,
"pending": pending
})
return completed_teams
def remove_team_from_tracker(tracker: dict, team_name: str) -> bool:
"""从 tracker 中删除整个 Team
Args:
tracker: team_tracker 数据
team_name: Team 名称
Returns:
bool: 是否成功删除
"""
if team_name in tracker.get("teams", {}):
del tracker["teams"][team_name]
return True
return False
def batch_remove_completed_teams(tracker: dict) -> dict:
"""批量删除所有处理成功的 Team
Args:
tracker: team_tracker 数据
Returns:
dict: {"success": 删除成功数, "failed": 删除失败数, "total": 总数, "details": [...]}
"""
completed_teams = get_completed_teams(tracker)
results = {
"success": 0,
"failed": 0,
"total": len(completed_teams),
"details": []
}
for team_info in completed_teams:
team_name = team_info["name"]
if remove_team_from_tracker(tracker, team_name):
results["success"] += 1
results["details"].append({
"name": team_name,
"status": "success",
"accounts": team_info["total"]
})
else:
results["failed"] += 1
results["details"].append({
"name": team_name,
"status": "failed",
"message": "删除失败"
})
return results