Files
codexTool/proxy_pool.py

355 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
代理池管理模块
- 从 proxy.txt 加载代理
- 并发测试代理可用性
- 线程安全的轮询分配
"""
import os
import time
import logging
import threading
import concurrent.futures
from pathlib import Path
from urllib.parse import urlparse
# 尝试导入 curl_cffi (更好的指纹伪装)
try:
from curl_cffi import requests as curl_requests
CURL_AVAILABLE = True
except ImportError:
curl_requests = None
CURL_AVAILABLE = False
import requests
log = logging.getLogger("proxy_pool")
BASE_DIR = Path(__file__).parent
PROXY_FILE = BASE_DIR / "proxy.txt"
# 测试目标 URL
TEST_URL = "https://api.openai.com/v1/models"
TEST_TIMEOUT = 10 # 秒
def _mask_proxy(proxy_url: str) -> str:
"""脱敏代理 URL (隐藏用户名密码)"""
if "@" in proxy_url:
parts = proxy_url.split("@")
scheme_auth = parts[0]
host_part = parts[-1]
if "://" in scheme_auth:
scheme = scheme_auth.split("://")[0]
return f"{scheme}://***@{host_part}"
return f"***@{host_part}"
return proxy_url
def parse_proxy_url(proxy_url: str) -> dict | None:
"""解析代理 URL返回结构化信息
支持格式:
http://host:port
http://username:password@host:port
socks5://host:port
socks5://username:password@host:port
Returns:
dict: {"url": str, "scheme": str, "host": str, "port": int, "username": str, "password": str}
None: 格式无效
"""
proxy_url = proxy_url.strip()
if not proxy_url:
return None
# 确保有 scheme
if not proxy_url.startswith(("http://", "https://", "socks5://", "socks4://")):
proxy_url = "http://" + proxy_url
try:
parsed = urlparse(proxy_url)
if not parsed.hostname or not parsed.port:
return None
return {
"url": proxy_url,
"scheme": parsed.scheme,
"host": parsed.hostname,
"port": parsed.port,
"username": parsed.username or "",
"password": parsed.password or "",
}
except Exception:
return None
def load_proxies() -> list[str]:
"""从 proxy.txt 加载代理列表
Returns:
list[str]: 代理 URL 列表
"""
if not PROXY_FILE.exists():
log.info("[ProxyPool] proxy.txt 不存在")
return []
proxies = []
invalid_count = 0
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
parsed = parse_proxy_url(line)
if parsed:
proxies.append(parsed["url"])
else:
invalid_count += 1
log.warning(f"[ProxyPool] 格式无效,已跳过: {line}")
except Exception as e:
log.error(f"[ProxyPool] 读取 proxy.txt 失败: {e}")
log.info(f"[ProxyPool] 从 proxy.txt 加载 {len(proxies)} 个代理" + (f"{invalid_count} 个格式无效" if invalid_count else ""))
return proxies
def save_proxies(proxies: list[str]):
"""保存代理列表到 proxy.txt (保留文件头部注释)
Args:
proxies: 代理 URL 列表
"""
header_lines = []
if PROXY_FILE.exists():
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
if line.strip().startswith("#") or not line.strip():
header_lines.append(line.rstrip())
else:
break
except Exception:
pass
try:
with open(PROXY_FILE, "w", encoding="utf-8") as f:
if header_lines:
f.write("\n".join(header_lines) + "\n")
for proxy in proxies:
f.write(proxy + "\n")
log.info(f"[ProxyPool] 已保存 {len(proxies)} 个代理到 proxy.txt")
except Exception as e:
log.error(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
"""测试单个代理是否可用
Args:
proxy_url: 代理 URL
timeout: 超时秒数
Returns:
dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str}
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
masked = _mask_proxy(proxy_url)
start = time.time()
try:
if CURL_AVAILABLE:
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
impersonate="edge",
)
else:
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
latency = int((time.time() - start) * 1000)
log.info(f"[ProxyPool] ✅ {masked} - {latency}ms")
return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""}
except Exception as e:
latency = int((time.time() - start) * 1000)
err_msg = str(e)[:80]
log.info(f"[ProxyPool] ❌ {masked} - 失败 ({err_msg})")
return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg}
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._working_proxies: list[str] = []
self._index = 0
self._lock = threading.Lock()
self._last_test_time: float = 0
self._last_test_results: dict = {} # {total, alive, removed}
def reload(self) -> int:
"""从文件重新加载代理
Returns:
int: 加载的代理数量
"""
with self._lock:
self._working_proxies = load_proxies()
self._index = 0
count = len(self._working_proxies)
log.info(f"[ProxyPool] 代理池已重新加载,共 {count} 个代理")
return count
def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict:
"""并发测试所有代理,移除不可用的
Args:
concurrency: 并发数
timeout: 单个代理超时秒数
Returns:
dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
log.info("[ProxyPool] 代理池为空,跳过测试")
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []}
return self._last_test_results
total = len(all_proxies)
start_time = time.time()
log.info(f"[ProxyPool] ========== 开始测试 {total} 个代理 (并发: {concurrency}) ==========")
# 并发测试
alive_proxies = []
details = [] # 每个代理的详细结果
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
executor.submit(test_single_proxy, proxy, timeout): proxy
for proxy in all_proxies
}
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
result = future.result()
details.append(result)
if result["alive"]:
alive_proxies.append(proxy)
except Exception as e:
details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]})
# 按原始顺序排序 details
proxy_order = {p: i for i, p in enumerate(all_proxies)}
details.sort(key=lambda d: proxy_order.get(d["proxy"], 999))
duration = time.time() - start_time
# 更新工作代理池 (保持原始顺序)
ordered_alive = [p for p in all_proxies if p in set(alive_proxies)]
with self._lock:
self._working_proxies = ordered_alive
self._index = 0
# 保存存活的代理到文件
dead_count = total - len(ordered_alive)
if dead_count > 0:
save_proxies(ordered_alive)
# 统计延迟
alive_latencies = [d["latency_ms"] for d in details if d["alive"]]
avg_ms = int(sum(alive_latencies) / len(alive_latencies)) if alive_latencies else 0
log.info(
f"[ProxyPool] ========== 测试完成 =========="
f" | 总计: {total} | 存活: {len(ordered_alive)} | 移除: {dead_count}"
f" | 平均延迟: {avg_ms}ms | 耗时: {round(duration, 1)}s"
)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
"alive": len(ordered_alive),
"removed": dead_count,
"duration": round(duration, 1),
"details": details,
}
return self._last_test_results
def get_next_proxy(self) -> str | None:
"""获取下一个代理 (轮询)
Returns:
str: 代理 URL池为空时返回 None
"""
with self._lock:
if not self._working_proxies:
return None
proxy = self._working_proxies[self._index % len(self._working_proxies)]
self._index += 1
return proxy
def get_proxy_count(self) -> int:
"""获取当前可用代理数量"""
with self._lock:
return len(self._working_proxies)
def get_status(self) -> dict:
"""获取代理池状态
Returns:
dict: {"count": int, "last_test_time": float, "last_test_results": dict}
"""
with self._lock:
return {
"count": len(self._working_proxies),
"proxies": list(self._working_proxies),
"last_test_time": self._last_test_time,
"last_test_results": self._last_test_results,
}
# ============ 全局单例 ============
_pool = ProxyPool()
def get_pool() -> ProxyPool:
"""获取全局代理池实例"""
return _pool
def reload_proxies() -> int:
"""重新加载代理"""
return _pool.reload()
def test_and_clean_proxies(concurrency: int = 20) -> dict:
"""并发测试并清理代理"""
return _pool.test_and_clean(concurrency=concurrency)
def get_next_proxy() -> str | None:
"""获取下一个代理 (轮询)"""
return _pool.get_next_proxy()
def get_proxy_count() -> int:
"""获取可用代理数量"""
return _pool.get_proxy_count()
def get_proxy_status() -> dict:
"""获取代理池状态"""
return _pool.get_status()