feat: Implement a proxy pool with concurrent testing and integrate proxy management commands into the Telegram bot.

This commit is contained in:
2026-02-11 02:10:11 +08:00
parent 2ff52d5d73
commit be8dd745fb
5 changed files with 537 additions and 5 deletions

312
proxy_pool.py Normal file
View File

@@ -0,0 +1,312 @@
"""
代理池管理模块
- 从 proxy.txt 加载代理
- 并发测试代理可用性
- 线程安全的轮询分配
"""
import os
import time
import threading
import concurrent.futures
from pathlib import Path
from urllib.parse import urlparse
# 尝试导入 curl_cffi (更好的指纹伪装)
try:
from curl_cffi import requests as curl_requests
CURL_AVAILABLE = True
except ImportError:
curl_requests = None
CURL_AVAILABLE = False
import requests
BASE_DIR = Path(__file__).parent
PROXY_FILE = BASE_DIR / "proxy.txt"
# 测试目标 URL
TEST_URL = "https://api.openai.com/v1/models"
TEST_TIMEOUT = 10 # 秒
def parse_proxy_url(proxy_url: str) -> dict | None:
"""解析代理 URL返回结构化信息
支持格式:
http://host:port
http://username:password@host:port
socks5://host:port
socks5://username:password@host:port
Returns:
dict: {"url": str, "scheme": str, "host": str, "port": int, "username": str, "password": str}
None: 格式无效
"""
proxy_url = proxy_url.strip()
if not proxy_url:
return None
# 确保有 scheme
if not proxy_url.startswith(("http://", "https://", "socks5://", "socks4://")):
proxy_url = "http://" + proxy_url
try:
parsed = urlparse(proxy_url)
if not parsed.hostname or not parsed.port:
return None
return {
"url": proxy_url,
"scheme": parsed.scheme,
"host": parsed.hostname,
"port": parsed.port,
"username": parsed.username or "",
"password": parsed.password or "",
}
except Exception:
return None
def load_proxies() -> list[str]:
"""从 proxy.txt 加载代理列表
Returns:
list[str]: 代理 URL 列表
"""
if not PROXY_FILE.exists():
return []
proxies = []
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
# 跳过空行和注释
if not line or line.startswith("#"):
continue
# 验证格式
parsed = parse_proxy_url(line)
if parsed:
proxies.append(parsed["url"])
except Exception:
pass
return proxies
def save_proxies(proxies: list[str]):
"""保存代理列表到 proxy.txt (保留文件头部注释)
Args:
proxies: 代理 URL 列表
"""
header_lines = []
# 读取原文件头部注释
if PROXY_FILE.exists():
try:
with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
if line.strip().startswith("#") or not line.strip():
header_lines.append(line.rstrip())
else:
break
except Exception:
pass
try:
with open(PROXY_FILE, "w", encoding="utf-8") as f:
# 写入头部注释
if header_lines:
f.write("\n".join(header_lines) + "\n")
# 写入代理
for proxy in proxies:
f.write(proxy + "\n")
except Exception as e:
print(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
"""测试单个代理是否可用
Args:
proxy_url: 代理 URL
timeout: 超时秒数
Returns:
bool: 代理是否可用
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
try:
if CURL_AVAILABLE:
# 使用 curl_cffi (更好的指纹)
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
impersonate="edge",
)
else:
# 回退到 requests
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
# 任何响应都算成功 (包括 401/403说明代理本身是通的)
return True
except Exception:
return False
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._working_proxies: list[str] = []
self._index = 0
self._lock = threading.Lock()
self._last_test_time: float = 0
self._last_test_results: dict = {} # {total, alive, removed}
def reload(self) -> int:
"""从文件重新加载代理
Returns:
int: 加载的代理数量
"""
with self._lock:
self._working_proxies = load_proxies()
self._index = 0
return len(self._working_proxies)
def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict:
"""并发测试所有代理,移除不可用的
Args:
concurrency: 并发数
timeout: 单个代理超时秒数
Returns:
dict: {"total": int, "alive": int, "removed": int, "duration": float}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0}
return self._last_test_results
total = len(all_proxies)
start_time = time.time()
# 并发测试
alive_proxies = []
dead_proxies = []
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
executor.submit(test_single_proxy, proxy, timeout): proxy
for proxy in all_proxies
}
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
is_alive = future.result()
if is_alive:
alive_proxies.append(proxy)
else:
dead_proxies.append(proxy)
except Exception:
dead_proxies.append(proxy)
duration = time.time() - start_time
# 更新工作代理池
with self._lock:
self._working_proxies = alive_proxies
self._index = 0
# 保存存活的代理到文件 (移除死亡代理)
if dead_proxies:
save_proxies(alive_proxies)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
"alive": len(alive_proxies),
"removed": len(dead_proxies),
"duration": round(duration, 1),
}
return self._last_test_results
def get_next_proxy(self) -> str | None:
"""获取下一个代理 (轮询)
Returns:
str: 代理 URL池为空时返回 None
"""
with self._lock:
if not self._working_proxies:
return None
proxy = self._working_proxies[self._index % len(self._working_proxies)]
self._index += 1
return proxy
def get_proxy_count(self) -> int:
"""获取当前可用代理数量"""
with self._lock:
return len(self._working_proxies)
def get_status(self) -> dict:
"""获取代理池状态
Returns:
dict: {"count": int, "last_test_time": float, "last_test_results": dict}
"""
with self._lock:
return {
"count": len(self._working_proxies),
"proxies": list(self._working_proxies),
"last_test_time": self._last_test_time,
"last_test_results": self._last_test_results,
}
# ============ 全局单例 ============
_pool = ProxyPool()
def get_pool() -> ProxyPool:
"""获取全局代理池实例"""
return _pool
def reload_proxies() -> int:
"""重新加载代理"""
return _pool.reload()
def test_and_clean_proxies(concurrency: int = 20) -> dict:
"""并发测试并清理代理"""
return _pool.test_and_clean(concurrency=concurrency)
def get_next_proxy() -> str | None:
"""获取下一个代理 (轮询)"""
return _pool.get_next_proxy()
def get_proxy_count() -> int:
"""获取可用代理数量"""
return _pool.get_proxy_count()
def get_proxy_status() -> dict:
"""获取代理池状态"""
return _pool.get_status()