Files
autoClaude/core/account_store.py

269 lines
7.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
线程安全的账号存储模块
统一管理 accounts.txt 的读写操作,避免并发冲突。
支持删除、统计等功能。
"""
import json
import threading
from pathlib import Path
_PROJECT_ROOT = Path(__file__).parent.parent
_ACCOUNTS_FILE = _PROJECT_ROOT / "accounts.txt"
_STATS_FILE = _PROJECT_ROOT / "stats.json"
_lock = threading.Lock()
# ====== 账号池(并发调度)======
# _busy 记录当前被占用的账号行(原始字符串)
_busy: set[str] = set()
def acquire(n: int = 0) -> list[str]:
"""从空闲账号中获取最多 n 个,标记为占用。
Args:
n: 需要的账号数。0 表示尽量获取所有空闲账号(至少 1 个)。
Returns:
被获取的账号行列表(可能少于 n为空表示没有空闲账号。
"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
all_lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
free = [line for line in all_lines if line not in _busy]
if not free:
return []
if n <= 0:
# 获取全部空闲
acquired = free
else:
acquired = free[:n]
_busy.update(acquired)
return acquired
def release(lines: list[str]) -> None:
"""释放指定账号,标记为空闲。"""
with _lock:
for line in lines:
_busy.discard(line)
def pool_status() -> dict:
"""返回账号池状态。"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
all_lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
all_lines = []
total = len(all_lines)
busy = sum(1 for line in all_lines if line in _busy)
return {"total": total, "busy": busy, "free": total - busy}
# ====== 账号操作 ======
def append(email: str, session_key: str, org_uuid: str) -> None:
"""追加一个账号"""
with _lock:
with open(_ACCOUNTS_FILE, "a", encoding="utf-8") as f:
f.write(f"{email}|{session_key}|{org_uuid}\n")
def read_all() -> list[dict]:
"""读取所有账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
result = []
for line in lines:
parts = line.split("|")
result.append({
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
})
return result
def read_lines() -> list[str]:
"""读取所有原始行"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
def get_last() -> dict | None:
"""获取最后一个账号"""
accounts = read_all()
return accounts[-1] if accounts else None
def get_last_line() -> str | None:
"""获取最后一行原始数据"""
lines = read_lines()
return lines[-1] if lines else None
def count() -> int:
"""账号总数"""
return len(read_all())
def delete_by_index(index: int) -> dict | None:
"""删除指定序号的账号1-based返回被删除的账号信息"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
if index < 1 or index > len(lines):
return None
removed_line = lines.pop(index - 1)
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
parts = removed_line.split("|")
return {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
def delete_by_email(email: str) -> dict | None:
"""按邮箱删除账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
removed = None
remaining = []
for line in lines:
parts = line.split("|")
if parts[0] == email and removed is None:
removed = {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
else:
remaining.append(line)
if removed:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return removed
def delete_by_emails(emails: list[str]) -> int:
"""批量按邮箱删除账号,返回实际删除数量"""
if not emails:
return 0
email_set = set(emails)
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return 0
remaining = []
deleted = 0
for line in lines:
parts = line.split("|")
if parts[0] in email_set:
deleted += 1
# 同时从 busy 集合中移除
_busy.discard(line)
else:
remaining.append(line)
if deleted > 0:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return deleted
# ====== 统计数据 ======
def _load_stats() -> dict:
try:
with open(_STATS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {
"register_total": 0,
"register_success": 0,
"register_fail": 0,
"register_fail_reasons": {},
"cc_total": 0,
"cc_pass": 0,
"cc_fail": 0,
}
def _save_stats(stats: dict):
with open(_STATS_FILE, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
def record_register(success: bool, fail_reason: str = ""):
"""记录一次注册结果"""
with _lock:
stats = _load_stats()
stats["register_total"] += 1
if success:
stats["register_success"] += 1
else:
stats["register_fail"] += 1
if fail_reason:
reasons = stats.setdefault("register_fail_reasons", {})
reasons[fail_reason] = reasons.get(fail_reason, 0) + 1
_save_stats(stats)
def record_cc(passed: bool):
"""记录一次 CC 检查结果"""
with _lock:
stats = _load_stats()
stats["cc_total"] += 1
if passed:
stats["cc_pass"] += 1
else:
stats["cc_fail"] += 1
_save_stats(stats)
def get_stats() -> dict:
"""获取统计数据"""
with _lock:
return _load_stats()