Files
autoClaude-TGbot/account_store.py

181 lines
5.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
线程安全的账号存储模块
统一管理 accounts.txt 的读写操作,避免并发冲突。
支持删除、统计等功能。
"""
import json
import threading
from pathlib import Path
_ACCOUNTS_FILE = Path(__file__).parent / "accounts.txt"
_STATS_FILE = Path(__file__).parent / "stats.json"
_lock = threading.Lock()
# ====== 账号操作 ======
def append(email: str, session_key: str, org_uuid: str) -> None:
"""追加一个账号"""
with _lock:
with open(_ACCOUNTS_FILE, "a", encoding="utf-8") as f:
f.write(f"{email}|{session_key}|{org_uuid}\n")
def read_all() -> list[dict]:
"""读取所有账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
result = []
for line in lines:
parts = line.split("|")
result.append({
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
})
return result
def read_lines() -> list[str]:
"""读取所有原始行"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
def get_last() -> dict | None:
"""获取最后一个账号"""
accounts = read_all()
return accounts[-1] if accounts else None
def get_last_line() -> str | None:
"""获取最后一行原始数据"""
lines = read_lines()
return lines[-1] if lines else None
def count() -> int:
"""账号总数"""
return len(read_all())
def delete_by_index(index: int) -> dict | None:
"""删除指定序号的账号1-based返回被删除的账号信息"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
if index < 1 or index > len(lines):
return None
removed_line = lines.pop(index - 1)
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
parts = removed_line.split("|")
return {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
def delete_by_email(email: str) -> dict | None:
"""按邮箱删除账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
removed = None
remaining = []
for line in lines:
parts = line.split("|")
if parts[0] == email and removed is None:
removed = {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
else:
remaining.append(line)
if removed:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return removed
# ====== 统计数据 ======
def _load_stats() -> dict:
try:
with open(_STATS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {
"register_total": 0,
"register_success": 0,
"register_fail": 0,
"register_fail_reasons": {},
"cc_total": 0,
"cc_pass": 0,
"cc_fail": 0,
}
def _save_stats(stats: dict):
with open(_STATS_FILE, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
def record_register(success: bool, fail_reason: str = ""):
"""记录一次注册结果"""
with _lock:
stats = _load_stats()
stats["register_total"] += 1
if success:
stats["register_success"] += 1
else:
stats["register_fail"] += 1
if fail_reason:
reasons = stats.setdefault("register_fail_reasons", {})
reasons[fail_reason] = reasons.get(fail_reason, 0) + 1
_save_stats(stats)
def record_cc(passed: bool):
"""记录一次 CC 检查结果"""
with _lock:
stats = _load_stats()
stats["cc_total"] += 1
if passed:
stats["cc_pass"] += 1
else:
stats["cc_fail"] += 1
_save_stats(stats)
def get_stats() -> dict:
"""获取统计数据"""
with _lock:
return _load_stats()