""" 代理池管理模块 - 从 proxy.txt 加载代理 - 并发测试代理可用性 - 线程安全的轮询分配 """ import os import time import threading import concurrent.futures from pathlib import Path from urllib.parse import urlparse # 尝试导入 curl_cffi (更好的指纹伪装) try: from curl_cffi import requests as curl_requests CURL_AVAILABLE = True except ImportError: curl_requests = None CURL_AVAILABLE = False import requests BASE_DIR = Path(__file__).parent PROXY_FILE = BASE_DIR / "proxy.txt" # 测试目标 URL TEST_URL = "https://api.openai.com/v1/models" TEST_TIMEOUT = 10 # 秒 def parse_proxy_url(proxy_url: str) -> dict | None: """解析代理 URL,返回结构化信息 支持格式: http://host:port http://username:password@host:port socks5://host:port socks5://username:password@host:port Returns: dict: {"url": str, "scheme": str, "host": str, "port": int, "username": str, "password": str} None: 格式无效 """ proxy_url = proxy_url.strip() if not proxy_url: return None # 确保有 scheme if not proxy_url.startswith(("http://", "https://", "socks5://", "socks4://")): proxy_url = "http://" + proxy_url try: parsed = urlparse(proxy_url) if not parsed.hostname or not parsed.port: return None return { "url": proxy_url, "scheme": parsed.scheme, "host": parsed.hostname, "port": parsed.port, "username": parsed.username or "", "password": parsed.password or "", } except Exception: return None def load_proxies() -> list[str]: """从 proxy.txt 加载代理列表 Returns: list[str]: 代理 URL 列表 """ if not PROXY_FILE.exists(): return [] proxies = [] try: with open(PROXY_FILE, "r", encoding="utf-8") as f: for line in f: line = line.strip() # 跳过空行和注释 if not line or line.startswith("#"): continue # 验证格式 parsed = parse_proxy_url(line) if parsed: proxies.append(parsed["url"]) except Exception: pass return proxies def save_proxies(proxies: list[str]): """保存代理列表到 proxy.txt (保留文件头部注释) Args: proxies: 代理 URL 列表 """ header_lines = [] # 读取原文件头部注释 if PROXY_FILE.exists(): try: with open(PROXY_FILE, "r", encoding="utf-8") as f: for line in f: if line.strip().startswith("#") or not line.strip(): header_lines.append(line.rstrip()) else: break except Exception: pass try: with open(PROXY_FILE, "w", encoding="utf-8") as f: # 写入头部注释 if header_lines: f.write("\n".join(header_lines) + "\n") # 写入代理 for proxy in proxies: f.write(proxy + "\n") except Exception as e: print(f"[ProxyPool] 保存代理文件失败: {e}") def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict: """测试单个代理是否可用 Args: proxy_url: 代理 URL timeout: 超时秒数 Returns: dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str} """ proxies_dict = {"http": proxy_url, "https": proxy_url} start = time.time() try: if CURL_AVAILABLE: resp = curl_requests.head( TEST_URL, proxies=proxies_dict, timeout=timeout, verify=False, impersonate="edge", ) else: resp = requests.head( TEST_URL, proxies=proxies_dict, timeout=timeout, verify=False, ) latency = int((time.time() - start) * 1000) return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""} except Exception as e: latency = int((time.time() - start) * 1000) err_msg = str(e)[:50] return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg} class ProxyPool: """线程安全的代理池""" def __init__(self): self._working_proxies: list[str] = [] self._index = 0 self._lock = threading.Lock() self._last_test_time: float = 0 self._last_test_results: dict = {} # {total, alive, removed} def reload(self) -> int: """从文件重新加载代理 Returns: int: 加载的代理数量 """ with self._lock: self._working_proxies = load_proxies() self._index = 0 return len(self._working_proxies) def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict: """并发测试所有代理,移除不可用的 Args: concurrency: 并发数 timeout: 单个代理超时秒数 Returns: dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list} """ # 先从文件加载最新 all_proxies = load_proxies() if not all_proxies: self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []} return self._last_test_results total = len(all_proxies) start_time = time.time() # 并发测试 alive_proxies = [] details = [] # 每个代理的详细结果 with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: future_to_proxy = { executor.submit(test_single_proxy, proxy, timeout): proxy for proxy in all_proxies } for future in concurrent.futures.as_completed(future_to_proxy): proxy = future_to_proxy[future] try: result = future.result() details.append(result) if result["alive"]: alive_proxies.append(proxy) except Exception as e: details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]}) # 按原始顺序排序 details proxy_order = {p: i for i, p in enumerate(all_proxies)} details.sort(key=lambda d: proxy_order.get(d["proxy"], 999)) duration = time.time() - start_time # 更新工作代理池 (保持原始顺序) ordered_alive = [p for p in all_proxies if p in set(alive_proxies)] with self._lock: self._working_proxies = ordered_alive self._index = 0 # 保存存活的代理到文件 dead_count = total - len(ordered_alive) if dead_count > 0: save_proxies(ordered_alive) self._last_test_time = time.time() self._last_test_results = { "total": total, "alive": len(ordered_alive), "removed": dead_count, "duration": round(duration, 1), "details": details, } return self._last_test_results def get_next_proxy(self) -> str | None: """获取下一个代理 (轮询) Returns: str: 代理 URL,池为空时返回 None """ with self._lock: if not self._working_proxies: return None proxy = self._working_proxies[self._index % len(self._working_proxies)] self._index += 1 return proxy def get_proxy_count(self) -> int: """获取当前可用代理数量""" with self._lock: return len(self._working_proxies) def get_status(self) -> dict: """获取代理池状态 Returns: dict: {"count": int, "last_test_time": float, "last_test_results": dict} """ with self._lock: return { "count": len(self._working_proxies), "proxies": list(self._working_proxies), "last_test_time": self._last_test_time, "last_test_results": self._last_test_results, } # ============ 全局单例 ============ _pool = ProxyPool() def get_pool() -> ProxyPool: """获取全局代理池实例""" return _pool def reload_proxies() -> int: """重新加载代理""" return _pool.reload() def test_and_clean_proxies(concurrency: int = 20) -> dict: """并发测试并清理代理""" return _pool.test_and_clean(concurrency=concurrency) def get_next_proxy() -> str | None: """获取下一个代理 (轮询)""" return _pool.get_next_proxy() def get_proxy_count() -> int: """获取可用代理数量""" return _pool.get_proxy_count() def get_proxy_status() -> dict: """获取代理池状态""" return _pool.get_status()