""" 线程安全的账号存储模块 统一管理 accounts.txt 的读写操作,避免并发冲突。 支持删除、统计等功能。 """ import json import threading from pathlib import Path _PROJECT_ROOT = Path(__file__).parent.parent _ACCOUNTS_FILE = _PROJECT_ROOT / "accounts.txt" _STATS_FILE = _PROJECT_ROOT / "stats.json" _lock = threading.Lock() # ====== 账号池(并发调度)====== # _busy 记录当前被占用的账号行(原始字符串) _busy: set[str] = set() def acquire(n: int = 0) -> list[str]: """从空闲账号中获取最多 n 个,标记为占用。 Args: n: 需要的账号数。0 表示尽量获取所有空闲账号(至少 1 个)。 Returns: 被获取的账号行列表(可能少于 n),为空表示没有空闲账号。 """ with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: all_lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: return [] free = [line for line in all_lines if line not in _busy] if not free: return [] if n <= 0: # 获取全部空闲 acquired = free else: acquired = free[:n] _busy.update(acquired) return acquired def release(lines: list[str]) -> None: """释放指定账号,标记为空闲。""" with _lock: for line in lines: _busy.discard(line) def pool_status() -> dict: """返回账号池状态。""" with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: all_lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: all_lines = [] total = len(all_lines) busy = sum(1 for line in all_lines if line in _busy) return {"total": total, "busy": busy, "free": total - busy} # ====== 账号操作 ====== def append(email: str, session_key: str, org_uuid: str) -> None: """追加一个账号""" with _lock: with open(_ACCOUNTS_FILE, "a", encoding="utf-8") as f: f.write(f"{email}|{session_key}|{org_uuid}\n") def read_all() -> list[dict]: """读取所有账号""" with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: return [] result = [] for line in lines: parts = line.split("|") result.append({ "email": parts[0] if len(parts) > 0 else "", "session_key": parts[1] if len(parts) > 1 else "", "org_uuid": parts[2] if len(parts) > 2 else "", }) return result def read_lines() -> list[str]: """读取所有原始行""" with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: return [line.strip() for line in f if line.strip()] except FileNotFoundError: return [] def get_last() -> dict | None: """获取最后一个账号""" accounts = read_all() return accounts[-1] if accounts else None def get_last_line() -> str | None: """获取最后一行原始数据""" lines = read_lines() return lines[-1] if lines else None def count() -> int: """账号总数""" return len(read_all()) def delete_by_index(index: int) -> dict | None: """删除指定序号的账号(1-based),返回被删除的账号信息""" with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: return None if index < 1 or index > len(lines): return None removed_line = lines.pop(index - 1) with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f: for line in lines: f.write(line + "\n") parts = removed_line.split("|") return { "email": parts[0] if len(parts) > 0 else "", "session_key": parts[1] if len(parts) > 1 else "", "org_uuid": parts[2] if len(parts) > 2 else "", } def delete_by_email(email: str) -> dict | None: """按邮箱删除账号""" with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: return None removed = None remaining = [] for line in lines: parts = line.split("|") if parts[0] == email and removed is None: removed = { "email": parts[0] if len(parts) > 0 else "", "session_key": parts[1] if len(parts) > 1 else "", "org_uuid": parts[2] if len(parts) > 2 else "", } else: remaining.append(line) if removed: with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f: for line in remaining: f.write(line + "\n") return removed def delete_by_emails(emails: list[str]) -> int: """批量按邮箱删除账号,返回实际删除数量""" if not emails: return 0 email_set = set(emails) with _lock: try: with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f: lines = [line.strip() for line in f if line.strip()] except FileNotFoundError: return 0 remaining = [] deleted = 0 for line in lines: parts = line.split("|") if parts[0] in email_set: deleted += 1 # 同时从 busy 集合中移除 _busy.discard(line) else: remaining.append(line) if deleted > 0: with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f: for line in remaining: f.write(line + "\n") return deleted # ====== 统计数据 ====== def _load_stats() -> dict: try: with open(_STATS_FILE, "r", encoding="utf-8") as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return { "register_total": 0, "register_success": 0, "register_fail": 0, "register_fail_reasons": {}, "cc_total": 0, "cc_pass": 0, "cc_fail": 0, } def _save_stats(stats: dict): with open(_STATS_FILE, "w", encoding="utf-8") as f: json.dump(stats, f, ensure_ascii=False, indent=2) def record_register(success: bool, fail_reason: str = ""): """记录一次注册结果""" with _lock: stats = _load_stats() stats["register_total"] += 1 if success: stats["register_success"] += 1 else: stats["register_fail"] += 1 if fail_reason: reasons = stats.setdefault("register_fail_reasons", {}) reasons[fail_reason] = reasons.get(fail_reason, 0) + 1 _save_stats(stats) def record_cc(passed: bool): """记录一次 CC 检查结果""" with _lock: stats = _load_stats() stats["cc_total"] += 1 if passed: stats["cc_pass"] += 1 else: stats["cc_fail"] += 1 _save_stats(stats) def get_stats() -> dict: """获取统计数据""" with _lock: return _load_stats()