Files
codexTool/proxy_pool.py

320 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
代理池管理模块
- 从 proxy.txt 加载代理
- 并发测试代理可用性
- 线程安全的轮询分配
"""
import os
import time
import threading
import concurrent.futures
from pathlib import Path
from urllib.parse import urlparse
# 尝试导入 curl_cffi (更好的指纹伪装)
try:
from curl_cffi import requests as curl_requests
CURL_AVAILABLE = True
except ImportError:
curl_requests = None
CURL_AVAILABLE = False
import requests
BASE_DIR = Path(__file__).parent
PROXY_FILE = BASE_DIR / "proxy.txt"
# 测试目标 URL
TEST_URL = "https://api.openai.com/v1/models"
TEST_TIMEOUT = 10 # 秒
def parse_proxy_url(proxy_url: str) -> dict | None:
"""解析代理 URL返回结构化信息
支持格式:
http://host:port
http://username:password@host:port
socks5://host:port
socks5://username:password@host:port
Returns:
dict: {"url": str, "scheme": str, "host": str, "port": int, "username": str, "password": str}
None: 格式无效
"""
proxy_url = proxy_url.strip()
if not proxy_url:
return None
# 确保有 scheme
if not proxy_url.startswith(("http://", "https://", "socks5://", "socks4://")):
proxy_url = "http://" + proxy_url
try:
parsed = urlparse(proxy_url)
if not parsed.hostname or not parsed.port:
return None
return {
"url": proxy_url,
"scheme": parsed.scheme,
"host": parsed.hostname,
"port": parsed.port,
"username": parsed.username or "",
"password": parsed.password or "",
}
except Exception:
return None
def load_proxies() -> list[str]:
"""从 proxy.txt 加载代理列表
Returns:
list[str]: 代理 URL 列表
"""
if not PROXY_FILE.exists():
return []
proxies = []
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 跳过空行和注释
if not line or line.startswith("#"):
continue
# 验证格式
parsed = parse_proxy_url(line)
if parsed:
proxies.append(parsed["url"])
except Exception:
pass
return proxies
def save_proxies(proxies: list[str]):
"""保存代理列表到 proxy.txt (保留文件头部注释)
Args:
proxies: 代理 URL 列表
"""
header_lines = []
# 读取原文件头部注释
if PROXY_FILE.exists():
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
if line.strip().startswith("#") or not line.strip():
header_lines.append(line.rstrip())
else:
break
except Exception:
pass
try:
with open(PROXY_FILE, "w", encoding="utf-8") as f:
# 写入头部注释
if header_lines:
f.write("\n".join(header_lines) + "\n")
# 写入代理
for proxy in proxies:
f.write(proxy + "\n")
except Exception as e:
print(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
"""测试单个代理是否可用
Args:
proxy_url: 代理 URL
timeout: 超时秒数
Returns:
dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str}
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
start = time.time()
try:
if CURL_AVAILABLE:
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
impersonate="edge",
)
else:
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
latency = int((time.time() - start) * 1000)
return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""}
except Exception as e:
latency = int((time.time() - start) * 1000)
err_msg = str(e)[:50]
return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg}
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._working_proxies: list[str] = []
self._index = 0
self._lock = threading.Lock()
self._last_test_time: float = 0
self._last_test_results: dict = {} # {total, alive, removed}
def reload(self) -> int:
"""从文件重新加载代理
Returns:
int: 加载的代理数量
"""
with self._lock:
self._working_proxies = load_proxies()
self._index = 0
return len(self._working_proxies)
def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict:
"""并发测试所有代理,移除不可用的
Args:
concurrency: 并发数
timeout: 单个代理超时秒数
Returns:
dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []}
return self._last_test_results
total = len(all_proxies)
start_time = time.time()
# 并发测试
alive_proxies = []
details = [] # 每个代理的详细结果
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
executor.submit(test_single_proxy, proxy, timeout): proxy
for proxy in all_proxies
}
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
result = future.result()
details.append(result)
if result["alive"]:
alive_proxies.append(proxy)
except Exception as e:
details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]})
# 按原始顺序排序 details
proxy_order = {p: i for i, p in enumerate(all_proxies)}
details.sort(key=lambda d: proxy_order.get(d["proxy"], 999))
duration = time.time() - start_time
# 更新工作代理池 (保持原始顺序)
ordered_alive = [p for p in all_proxies if p in set(alive_proxies)]
with self._lock:
self._working_proxies = ordered_alive
self._index = 0
# 保存存活的代理到文件
dead_count = total - len(ordered_alive)
if dead_count > 0:
save_proxies(ordered_alive)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
"alive": len(ordered_alive),
"removed": dead_count,
"duration": round(duration, 1),
"details": details,
}
return self._last_test_results
def get_next_proxy(self) -> str | None:
"""获取下一个代理 (轮询)
Returns:
str: 代理 URL池为空时返回 None
"""
with self._lock:
if not self._working_proxies:
return None
proxy = self._working_proxies[self._index % len(self._working_proxies)]
self._index += 1
return proxy
def get_proxy_count(self) -> int:
"""获取当前可用代理数量"""
with self._lock:
return len(self._working_proxies)
def get_status(self) -> dict:
"""获取代理池状态
Returns:
dict: {"count": int, "last_test_time": float, "last_test_results": dict}
"""
with self._lock:
return {
"count": len(self._working_proxies),
"proxies": list(self._working_proxies),
"last_test_time": self._last_test_time,
"last_test_results": self._last_test_results,
}
# ============ 全局单例 ============
_pool = ProxyPool()
def get_pool() -> ProxyPool:
"""获取全局代理池实例"""
return _pool
def reload_proxies() -> int:
"""重新加载代理"""
return _pool.reload()
def test_and_clean_proxies(concurrency: int = 20) -> dict:
"""并发测试并清理代理"""
return _pool.test_and_clean(concurrency=concurrency)
def get_next_proxy() -> str | None:
"""获取下一个代理 (轮询)"""
return _pool.get_next_proxy()
def get_proxy_count() -> int:
"""获取可用代理数量"""
return _pool.get_proxy_count()
def get_proxy_status() -> dict:
"""获取代理池状态"""
return _pool.get_status()