""" 代理池管理模块 功能: - 从 proxy.txt 加载代理(支持 host:port:user:pass 格式) - 基于优先级的智能选取(优先使用表现好的代理) - 自动测试连通性和延迟 - 测试失败降低优先级,过低则淘汰 - 线程安全 """ import random import threading import time from dataclasses import dataclass, field from pathlib import Path import requests as std_requests # --- 配置常量 --- _PROXY_FILE = Path(__file__).parent / "proxy.txt" _TEST_URL = "https://claude.ai" # 测试目标 _TEST_TIMEOUT = 10 # 测试超时秒数 _INITIAL_PRIORITY = 100 # 初始优先级 _FAIL_PENALTY = 30 # 每次失败扣分 _SUCCESS_BONUS = 10 # 每次成功加分 _MAX_PRIORITY = 100 # 最高优先级 _REMOVE_THRESHOLD = 0 # 优先级低于此值则淘汰 @dataclass class Proxy: """代理实例""" raw: str # 原始行 url: str # 解析后的 URL (http://user:pass@host:port) host: str port: str priority: int = _INITIAL_PRIORITY latency: float = 0.0 # 最近一次测试延迟 (ms) fail_count: int = 0 success_count: int = 0 last_test_time: float = 0.0 last_test_ok: bool = True @property def masked_url(self) -> str: """脱敏显示""" if "@" in self.url: prefix = self.url.split("@")[0] suffix = self.url.split("@")[1] # 隐藏密码 if ":" in prefix.replace("http://", "").replace("https://", ""): user_part = prefix.split(":")[-2].split("/")[-1] return f"{self.host}:{self.port} ({user_part[:8]}...)" return f"{self.host}:{self.port}" def _parse_line(line: str) -> Proxy | None: """解析一行代理配置""" line = line.strip() if not line or line.startswith("#"): return None parts = line.split(":") if len(parts) == 4: host, port, user, passwd = parts url = f"http://{user}:{passwd}@{host}:{port}" return Proxy(raw=line, url=url, host=host, port=port) elif len(parts) == 2: host, port = parts url = f"http://{host}:{port}" return Proxy(raw=line, url=url, host=host, port=port) elif line.startswith(("http://", "https://", "socks5://")): # 从完整 URL 提取 host:port try: from urllib.parse import urlparse parsed = urlparse(line) return Proxy(raw=line, url=line, host=parsed.hostname or "?", port=str(parsed.port or "?")) except Exception: return None return None class ProxyPool: """线程安全的代理池""" def __init__(self): self._proxies: list[Proxy] = [] self._lock = threading.Lock() self.enabled = True # 代理开关 self._load() def _load(self): """从 proxy.txt 加载代理""" if not _PROXY_FILE.exists(): print("[*] 未找到 proxy.txt,不使用代理") return with open(_PROXY_FILE, "r", encoding="utf-8") as f: for line in f: proxy = _parse_line(line) if proxy: self._proxies.append(proxy) if self._proxies: print(f"[+] 代理池: 已加载 {len(self._proxies)} 个代理") else: print("[!] proxy.txt 存在但没有有效代理") def reload(self): """重新加载 proxy.txt""" with self._lock: self._proxies.clear() self._load() @property def count(self) -> int: return len(self._proxies) @property def active_count(self) -> int: """有效代理数量""" return sum(1 for p in self._proxies if p.priority > _REMOVE_THRESHOLD) def get(self) -> dict: """ 基于优先级加权随机选取一个代理,返回 requests 格式的 proxies dict。 代理关闭或无可用代理时返回空 dict(直连)。 """ if not self.enabled: return {} with self._lock: alive = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD] if not alive: return {} # 加权随机:priority 越高越容易选中 weights = [p.priority for p in alive] chosen = random.choices(alive, weights=weights, k=1)[0] return {"http": chosen.url, "https": chosen.url} def report_success(self, proxies: dict): """调用方报告该代理请求成功""" if not proxies: return url = proxies.get("https", "") with self._lock: for p in self._proxies: if p.url == url: p.success_count += 1 p.priority = min(p.priority + _SUCCESS_BONUS, _MAX_PRIORITY) break def report_failure(self, proxies: dict): """调用方报告该代理请求失败,降低优先级""" if not proxies: return url = proxies.get("https", "") with self._lock: for p in self._proxies: if p.url == url: p.fail_count += 1 p.priority -= _FAIL_PENALTY if p.priority <= _REMOVE_THRESHOLD: print(f"[!] 代理已淘汰 (优先级归零): {p.masked_url}") break def _cleanup(self): """移除优先级过低的代理""" before = len(self._proxies) self._proxies = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD] removed = before - len(self._proxies) if removed: print(f"[!] 清理了 {removed} 个失效代理,剩余 {len(self._proxies)} 个") self._save() def _save(self): """将当前有效代理写回 proxy.txt""" with open(_PROXY_FILE, "w", encoding="utf-8") as f: for p in self._proxies: f.write(p.raw + "\n") def test_one(self, proxy: Proxy) -> dict: """测试单个代理,返回结果 dict""" proxies = {"http": proxy.url, "https": proxy.url} try: start = time.time() resp = std_requests.get( _TEST_URL, proxies=proxies, timeout=_TEST_TIMEOUT, allow_redirects=True, ) latency = (time.time() - start) * 1000 # ms proxy.latency = latency proxy.last_test_time = time.time() if resp.status_code < 500: proxy.last_test_ok = True proxy.success_count += 1 proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY) return {"ok": True, "latency_ms": round(latency), "status": resp.status_code} else: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"} except std_requests.exceptions.ConnectTimeout: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() return {"ok": False, "latency_ms": -1, "error": "连接超时"} except std_requests.exceptions.ProxyError as e: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"} except Exception as e: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() return {"ok": False, "latency_ms": -1, "error": str(e)} def test_all(self) -> list[dict]: """ 测试所有代理,返回结果列表。 测试后自动清理优先级过低的代理。 """ results = [] with self._lock: proxies_snapshot = list(self._proxies) for proxy in proxies_snapshot: result = self.test_one(proxy) result["proxy"] = proxy.masked_url result["priority"] = proxy.priority results.append(result) with self._lock: self._cleanup() return results def status_list(self) -> list[dict]: """返回所有代理的状态信息""" with self._lock: return [ { "proxy": p.masked_url, "priority": p.priority, "latency_ms": round(p.latency) if p.latency else "-", "success": p.success_count, "fail": p.fail_count, "last_ok": p.last_test_ok, } for p in self._proxies ] # --- 全局单例 --- pool = ProxyPool() def get_proxy() -> dict: """供外部模块调用:随机获取一个代理""" return pool.get() def get_proxy_count() -> int: """代理池大小""" return pool.count