Files
codexTool/proxy_pool.py

313 lines
8.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
代理池管理模块
- 从 proxy.txt 加载代理
- 并发测试代理可用性
- 线程安全的轮询分配
"""
import os
import time
import threading
import concurrent.futures
from pathlib import Path
from urllib.parse import urlparse
# 尝试导入 curl_cffi (更好的指纹伪装)
try:
from curl_cffi import requests as curl_requests
CURL_AVAILABLE = True
except ImportError:
curl_requests = None
CURL_AVAILABLE = False
import requests
BASE_DIR = Path(__file__).parent
PROXY_FILE = BASE_DIR / "proxy.txt"
# 测试目标 URL
TEST_URL = "https://api.openai.com/v1/models"
TEST_TIMEOUT = 10 # 秒
def parse_proxy_url(proxy_url: str) -> dict | None:
"""解析代理 URL返回结构化信息
支持格式:
http://host:port
http://username:password@host:port
socks5://host:port
socks5://username:password@host:port
Returns:
dict: {"url": str, "scheme": str, "host": str, "port": int, "username": str, "password": str}
None: 格式无效
"""
proxy_url = proxy_url.strip()
if not proxy_url:
return None
# 确保有 scheme
if not proxy_url.startswith(("http://", "https://", "socks5://", "socks4://")):
proxy_url = "http://" + proxy_url
try:
parsed = urlparse(proxy_url)
if not parsed.hostname or not parsed.port:
return None
return {
"url": proxy_url,
"scheme": parsed.scheme,
"host": parsed.hostname,
"port": parsed.port,
"username": parsed.username or "",
"password": parsed.password or "",
}
except Exception:
return None
def load_proxies() -> list[str]:
"""从 proxy.txt 加载代理列表
Returns:
list[str]: 代理 URL 列表
"""
if not PROXY_FILE.exists():
return []
proxies = []
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 跳过空行和注释
if not line or line.startswith("#"):
continue
# 验证格式
parsed = parse_proxy_url(line)
if parsed:
proxies.append(parsed["url"])
except Exception:
pass
return proxies
def save_proxies(proxies: list[str]):
"""保存代理列表到 proxy.txt (保留文件头部注释)
Args:
proxies: 代理 URL 列表
"""
header_lines = []
# 读取原文件头部注释
if PROXY_FILE.exists():
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
if line.strip().startswith("#") or not line.strip():
header_lines.append(line.rstrip())
else:
break
except Exception:
pass
try:
with open(PROXY_FILE, "w", encoding="utf-8") as f:
# 写入头部注释
if header_lines:
f.write("\n".join(header_lines) + "\n")
# 写入代理
for proxy in proxies:
f.write(proxy + "\n")
except Exception as e:
print(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
"""测试单个代理是否可用
Args:
proxy_url: 代理 URL
timeout: 超时秒数
Returns:
bool: 代理是否可用
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
try:
if CURL_AVAILABLE:
# 使用 curl_cffi (更好的指纹)
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
impersonate="edge",
)
else:
# 回退到 requests
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
# 任何响应都算成功 (包括 401/403说明代理本身是通的)
return True
except Exception:
return False
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._working_proxies: list[str] = []
self._index = 0
self._lock = threading.Lock()
self._last_test_time: float = 0
self._last_test_results: dict = {} # {total, alive, removed}
def reload(self) -> int:
"""从文件重新加载代理
Returns:
int: 加载的代理数量
"""
with self._lock:
self._working_proxies = load_proxies()
self._index = 0
return len(self._working_proxies)
def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict:
"""并发测试所有代理,移除不可用的
Args:
concurrency: 并发数
timeout: 单个代理超时秒数
Returns:
dict: {"total": int, "alive": int, "removed": int, "duration": float}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0}
return self._last_test_results
total = len(all_proxies)
start_time = time.time()
# 并发测试
alive_proxies = []
dead_proxies = []
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
executor.submit(test_single_proxy, proxy, timeout): proxy
for proxy in all_proxies
}
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
is_alive = future.result()
if is_alive:
alive_proxies.append(proxy)
else:
dead_proxies.append(proxy)
except Exception:
dead_proxies.append(proxy)
duration = time.time() - start_time
# 更新工作代理池
with self._lock:
self._working_proxies = alive_proxies
self._index = 0
# 保存存活的代理到文件 (移除死亡代理)
if dead_proxies:
save_proxies(alive_proxies)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
"alive": len(alive_proxies),
"removed": len(dead_proxies),
"duration": round(duration, 1),
}
return self._last_test_results
def get_next_proxy(self) -> str | None:
"""获取下一个代理 (轮询)
Returns:
str: 代理 URL池为空时返回 None
"""
with self._lock:
if not self._working_proxies:
return None
proxy = self._working_proxies[self._index % len(self._working_proxies)]
self._index += 1
return proxy
def get_proxy_count(self) -> int:
"""获取当前可用代理数量"""
with self._lock:
return len(self._working_proxies)
def get_status(self) -> dict:
"""获取代理池状态
Returns:
dict: {"count": int, "last_test_time": float, "last_test_results": dict}
"""
with self._lock:
return {
"count": len(self._working_proxies),
"proxies": list(self._working_proxies),
"last_test_time": self._last_test_time,
"last_test_results": self._last_test_results,
}
# ============ 全局单例 ============
_pool = ProxyPool()
def get_pool() -> ProxyPool:
"""获取全局代理池实例"""
return _pool
def reload_proxies() -> int:
"""重新加载代理"""
return _pool.reload()
def test_and_clean_proxies(concurrency: int = 20) -> dict:
"""并发测试并清理代理"""
return _pool.test_and_clean(concurrency=concurrency)
def get_next_proxy() -> str | None:
"""获取下一个代理 (轮询)"""
return _pool.get_next_proxy()
def get_proxy_count() -> int:
"""获取可用代理数量"""
return _pool.get_proxy_count()
def get_proxy_status() -> dict:
"""获取代理池状态"""
return _pool.get_status()