feat: Implement proxy pool management and integrate it into the bot.

This commit is contained in:
2026-02-14 02:52:07 +08:00
parent 768963666a
commit a23b7e8d27
2 changed files with 95 additions and 31 deletions

View File

@@ -238,35 +238,54 @@ class ProxyPool:
logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": str(e)}
def test_all(self, progress_callback: Optional[Callable] = None) -> list[dict]:
def test_all(self, progress_callback: Optional[Callable] = None, max_workers: int = 5) -> list[dict]:
"""
测试所有代理,返回结果列表。
并发测试所有代理,返回结果列表。
测试后自动清理优先级过低的代理。
Args:
progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None
每测完一个代理后调用,用于更新前端进度。
max_workers: 并发测试线程数,默认 5。
"""
results = []
from concurrent.futures import ThreadPoolExecutor, as_completed
with self._lock:
proxies_snapshot = list(self._proxies)
total = len(proxies_snapshot)
logger.info(f"📡 开始批量测试 {total} 个代理...")
logger.info(f"📡 开始并发测试 {total} 个代理{max_workers} 并发)...")
for i, proxy in enumerate(proxies_snapshot, 1):
results = [None] * total # 保持顺序
completed = [0] # 用列表以便在闭包中修改
results_lock = threading.Lock()
def _test_proxy(index, proxy):
result = self.test_one(proxy)
result["proxy"] = proxy.masked_url
result["priority"] = proxy.priority
results.append(result)
return index, result
if progress_callback:
try:
progress_callback(i, total, result)
except Exception:
pass
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_test_proxy, i, proxy): i
for i, proxy in enumerate(proxies_snapshot)
}
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
ok_count = sum(1 for r in results if r["ok"])
with results_lock:
completed[0] += 1
current = completed[0]
if progress_callback:
try:
progress_callback(current, total, result)
except Exception:
pass
ok_count = sum(1 for r in results if r and r["ok"])
fail_count = total - ok_count
logger.info(
f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | "