Files
autoClaude/core/proxy_pool.py

278 lines
9.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
代理池管理模块
功能:
- 从 proxy.txt 加载代理(支持 host:port:user:pass 格式)
- 基于优先级的智能选取(优先使用表现好的代理)
- 自动测试连通性和延迟
- 测试失败降低优先级,过低则淘汰
- 线程安全
"""
import random
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
import requests as std_requests
# --- 配置常量 ---
_PROJECT_ROOT = Path(__file__).parent.parent
_PROXY_FILE = _PROJECT_ROOT / "proxy.txt"
_TEST_URL = "https://claude.ai" # 测试目标
_TEST_TIMEOUT = 10 # 测试超时秒数
_INITIAL_PRIORITY = 100 # 初始优先级
_FAIL_PENALTY = 30 # 每次失败扣分
_SUCCESS_BONUS = 10 # 每次成功加分
_MAX_PRIORITY = 100 # 最高优先级
_REMOVE_THRESHOLD = 0 # 优先级低于此值则淘汰
@dataclass
class Proxy:
"""代理实例"""
raw: str # 原始行
url: str # 解析后的 URL (http://user:pass@host:port)
host: str
port: str
priority: int = _INITIAL_PRIORITY
latency: float = 0.0 # 最近一次测试延迟 (ms)
fail_count: int = 0
success_count: int = 0
last_test_time: float = 0.0
last_test_ok: bool = True
@property
def masked_url(self) -> str:
"""脱敏显示"""
if "@" in self.url:
prefix = self.url.split("@")[0]
suffix = self.url.split("@")[1]
# 隐藏密码
if ":" in prefix.replace("http://", "").replace("https://", ""):
user_part = prefix.split(":")[-2].split("/")[-1]
return f"{self.host}:{self.port} ({user_part[:8]}...)"
return f"{self.host}:{self.port}"
def _parse_line(line: str) -> Proxy | None:
"""解析一行代理配置"""
line = line.strip()
if not line or line.startswith("#"):
return None
parts = line.split(":")
if len(parts) == 4:
host, port, user, passwd = parts
url = f"http://{user}:{passwd}@{host}:{port}"
return Proxy(raw=line, url=url, host=host, port=port)
elif len(parts) == 2:
host, port = parts
url = f"http://{host}:{port}"
return Proxy(raw=line, url=url, host=host, port=port)
elif line.startswith(("http://", "https://", "socks5://")):
# 从完整 URL 提取 host:port
try:
from urllib.parse import urlparse
parsed = urlparse(line)
return Proxy(raw=line, url=line, host=parsed.hostname or "?", port=str(parsed.port or "?"))
except Exception:
return None
return None
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._proxies: list[Proxy] = []
self._lock = threading.Lock()
self.enabled = True # 代理开关
self._load()
def _load(self):
"""从 proxy.txt 加载代理"""
if not _PROXY_FILE.exists():
print("[*] 未找到 proxy.txt不使用代理")
return
with open(_PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
proxy = _parse_line(line)
if proxy:
self._proxies.append(proxy)
if self._proxies:
print(f"[+] 代理池: 已加载 {len(self._proxies)} 个代理")
else:
print("[!] proxy.txt 存在但没有有效代理")
def reload(self):
"""重新加载 proxy.txt"""
with self._lock:
self._proxies.clear()
self._load()
@property
def count(self) -> int:
return len(self._proxies)
@property
def active_count(self) -> int:
"""有效代理数量"""
return sum(1 for p in self._proxies if p.priority > _REMOVE_THRESHOLD)
def get(self) -> dict:
"""
基于优先级加权随机选取一个代理,返回 requests 格式的 proxies dict。
代理关闭或无可用代理时返回空 dict直连
"""
if not self.enabled:
return {}
with self._lock:
alive = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD]
if not alive:
return {}
# 加权随机priority 越高越容易选中
weights = [p.priority for p in alive]
chosen = random.choices(alive, weights=weights, k=1)[0]
return {"http": chosen.url, "https": chosen.url}
def report_success(self, proxies: dict):
"""调用方报告该代理请求成功"""
if not proxies:
return
url = proxies.get("https", "")
with self._lock:
for p in self._proxies:
if p.url == url:
p.success_count += 1
p.priority = min(p.priority + _SUCCESS_BONUS, _MAX_PRIORITY)
break
def report_failure(self, proxies: dict):
"""调用方报告该代理请求失败,降低优先级"""
if not proxies:
return
url = proxies.get("https", "")
with self._lock:
for p in self._proxies:
if p.url == url:
p.fail_count += 1
p.priority -= _FAIL_PENALTY
if p.priority <= _REMOVE_THRESHOLD:
print(f"[!] 代理已淘汰 (优先级归零): {p.masked_url}")
break
def _cleanup(self):
"""移除优先级过低的代理"""
before = len(self._proxies)
self._proxies = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD]
removed = before - len(self._proxies)
if removed:
print(f"[!] 清理了 {removed} 个失效代理,剩余 {len(self._proxies)}")
self._save()
def _save(self):
"""将当前有效代理写回 proxy.txt"""
with open(_PROXY_FILE, "w", encoding="utf-8") as f:
for p in self._proxies:
f.write(p.raw + "\n")
def test_one(self, proxy: Proxy) -> dict:
"""测试单个代理,返回结果 dict"""
proxies = {"http": proxy.url, "https": proxy.url}
try:
start = time.time()
resp = std_requests.get(
_TEST_URL,
proxies=proxies,
timeout=_TEST_TIMEOUT,
allow_redirects=True,
)
latency = (time.time() - start) * 1000 # ms
proxy.latency = latency
proxy.last_test_time = time.time()
if resp.status_code < 500:
proxy.last_test_ok = True
proxy.success_count += 1
proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY)
return {"ok": True, "latency_ms": round(latency), "status": resp.status_code}
else:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"}
except std_requests.exceptions.ConnectTimeout:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
return {"ok": False, "latency_ms": -1, "error": "连接超时"}
except std_requests.exceptions.ProxyError as e:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"}
except Exception as e:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
return {"ok": False, "latency_ms": -1, "error": str(e)}
def test_all(self) -> list[dict]:
"""
测试所有代理,返回结果列表。
测试后自动清理优先级过低的代理。
"""
results = []
with self._lock:
proxies_snapshot = list(self._proxies)
for proxy in proxies_snapshot:
result = self.test_one(proxy)
result["proxy"] = proxy.masked_url
result["priority"] = proxy.priority
results.append(result)
with self._lock:
self._cleanup()
return results
def status_list(self) -> list[dict]:
"""返回所有代理的状态信息"""
with self._lock:
return [
{
"proxy": p.masked_url,
"priority": p.priority,
"latency_ms": round(p.latency) if p.latency else "-",
"success": p.success_count,
"fail": p.fail_count,
"last_ok": p.last_test_ok,
}
for p in self._proxies
]
# --- 全局单例 ---
pool = ProxyPool()
def get_proxy() -> dict:
"""供外部模块调用:随机获取一个代理"""
return pool.get()
def get_proxy_count() -> int:
"""代理池大小"""
return pool.count