refactor: enhance proxy pool logging with masked URLs and decrease proxy test concurrency from 20 to 10.

This commit is contained in:
2026-02-11 02:22:18 +08:00
parent 660d43161d
commit 85c270f55f
2 changed files with 51 additions and 16 deletions

View File

@@ -7,6 +7,7 @@
import os import os
import time import time
import logging
import threading import threading
import concurrent.futures import concurrent.futures
from pathlib import Path from pathlib import Path
@@ -22,6 +23,8 @@ except ImportError:
import requests import requests
log = logging.getLogger("proxy_pool")
BASE_DIR = Path(__file__).parent BASE_DIR = Path(__file__).parent
PROXY_FILE = BASE_DIR / "proxy.txt" PROXY_FILE = BASE_DIR / "proxy.txt"
@@ -30,6 +33,19 @@ TEST_URL = "https://api.openai.com/v1/models"
TEST_TIMEOUT = 10 # 秒 TEST_TIMEOUT = 10 # 秒
def _mask_proxy(proxy_url: str) -> str:
"""脱敏代理 URL (隐藏用户名密码)"""
if "@" in proxy_url:
parts = proxy_url.split("@")
scheme_auth = parts[0]
host_part = parts[-1]
if "://" in scheme_auth:
scheme = scheme_auth.split("://")[0]
return f"{scheme}://***@{host_part}"
return f"***@{host_part}"
return proxy_url
def parse_proxy_url(proxy_url: str) -> dict | None: def parse_proxy_url(proxy_url: str) -> dict | None:
"""解析代理 URL返回结构化信息 """解析代理 URL返回结构化信息
@@ -75,23 +91,27 @@ def load_proxies() -> list[str]:
list[str]: 代理 URL 列表 list[str]: 代理 URL 列表
""" """
if not PROXY_FILE.exists(): if not PROXY_FILE.exists():
log.info("[ProxyPool] proxy.txt 不存在")
return [] return []
proxies = [] proxies = []
invalid_count = 0
try: try:
with open(PROXY_FILE, "r", encoding="utf-8") as f: with open(PROXY_FILE, "r", encoding="utf-8") as f:
for line in f: for line in f:
line = line.strip() line = line.strip()
# 跳过空行和注释
if not line or line.startswith("#"): if not line or line.startswith("#"):
continue continue
# 验证格式
parsed = parse_proxy_url(line) parsed = parse_proxy_url(line)
if parsed: if parsed:
proxies.append(parsed["url"]) proxies.append(parsed["url"])
except Exception: else:
pass invalid_count += 1
log.warning(f"[ProxyPool] 格式无效,已跳过: {line}")
except Exception as e:
log.error(f"[ProxyPool] 读取 proxy.txt 失败: {e}")
log.info(f"[ProxyPool] 从 proxy.txt 加载 {len(proxies)} 个代理" + (f"{invalid_count} 个格式无效" if invalid_count else ""))
return proxies return proxies
@@ -103,7 +123,6 @@ def save_proxies(proxies: list[str]):
""" """
header_lines = [] header_lines = []
# 读取原文件头部注释
if PROXY_FILE.exists(): if PROXY_FILE.exists():
try: try:
with open(PROXY_FILE, "r", encoding="utf-8") as f: with open(PROXY_FILE, "r", encoding="utf-8") as f:
@@ -117,14 +136,13 @@ def save_proxies(proxies: list[str]):
try: try:
with open(PROXY_FILE, "w", encoding="utf-8") as f: with open(PROXY_FILE, "w", encoding="utf-8") as f:
# 写入头部注释
if header_lines: if header_lines:
f.write("\n".join(header_lines) + "\n") f.write("\n".join(header_lines) + "\n")
# 写入代理
for proxy in proxies: for proxy in proxies:
f.write(proxy + "\n") f.write(proxy + "\n")
log.info(f"[ProxyPool] 已保存 {len(proxies)} 个代理到 proxy.txt")
except Exception as e: except Exception as e:
print(f"[ProxyPool] 保存代理文件失败: {e}") log.error(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict: def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
@@ -138,6 +156,7 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str} dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str}
""" """
proxies_dict = {"http": proxy_url, "https": proxy_url} proxies_dict = {"http": proxy_url, "https": proxy_url}
masked = _mask_proxy(proxy_url)
start = time.time() start = time.time()
try: try:
@@ -157,10 +176,12 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
verify=False, verify=False,
) )
latency = int((time.time() - start) * 1000) latency = int((time.time() - start) * 1000)
log.info(f"[ProxyPool] ✅ {masked} - {latency}ms")
return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""} return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""}
except Exception as e: except Exception as e:
latency = int((time.time() - start) * 1000) latency = int((time.time() - start) * 1000)
err_msg = str(e)[:50] err_msg = str(e)[:80]
log.info(f"[ProxyPool] ❌ {masked} - 失败 ({err_msg})")
return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg} return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg}
@@ -176,14 +197,16 @@ class ProxyPool:
def reload(self) -> int: def reload(self) -> int:
"""从文件重新加载代理 """从文件重新加载代理
Returns: Returns:
int: 加载的代理数量 int: 加载的代理数量
""" """
with self._lock: with self._lock:
self._working_proxies = load_proxies() self._working_proxies = load_proxies()
self._index = 0 self._index = 0
return len(self._working_proxies) count = len(self._working_proxies)
log.info(f"[ProxyPool] 代理池已重新加载,共 {count} 个代理")
return count
def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict: def test_and_clean(self, concurrency: int = 20, timeout: int = TEST_TIMEOUT) -> dict:
"""并发测试所有代理,移除不可用的 """并发测试所有代理,移除不可用的
@@ -198,11 +221,13 @@ class ProxyPool:
# 先从文件加载最新 # 先从文件加载最新
all_proxies = load_proxies() all_proxies = load_proxies()
if not all_proxies: if not all_proxies:
log.info("[ProxyPool] 代理池为空,跳过测试")
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []} self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []}
return self._last_test_results return self._last_test_results
total = len(all_proxies) total = len(all_proxies)
start_time = time.time() start_time = time.time()
log.info(f"[ProxyPool] ========== 开始测试 {total} 个代理 (并发: {concurrency}) ==========")
# 并发测试 # 并发测试
alive_proxies = [] alive_proxies = []
@@ -241,6 +266,16 @@ class ProxyPool:
if dead_count > 0: if dead_count > 0:
save_proxies(ordered_alive) save_proxies(ordered_alive)
# 统计延迟
alive_latencies = [d["latency_ms"] for d in details if d["alive"]]
avg_ms = int(sum(alive_latencies) / len(alive_latencies)) if alive_latencies else 0
log.info(
f"[ProxyPool] ========== 测试完成 =========="
f" | 总计: {total} | 存活: {len(ordered_alive)} | 移除: {dead_count}"
f" | 平均延迟: {avg_ms}ms | 耗时: {round(duration, 1)}s"
)
self._last_test_time = time.time() self._last_test_time = time.time()
self._last_test_results = { self._last_test_results = {
"total": total, "total": total,

View File

@@ -3974,13 +3974,13 @@ class ProvisionerBot:
await self.app.bot.send_message( await self.app.bot.send_message(
chat_id, chat_id,
f"🌐 <b>代理池预检测</b>\n\n" f"🌐 <b>代理池预检测</b>\n\n"
f"正在测试 {pool_count} 个代理 (20 并发)...", f"正在测试 {pool_count} 个代理 (10 并发)...",
parse_mode="HTML" parse_mode="HTML"
) )
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
test_result = await loop.run_in_executor( test_result = await loop.run_in_executor(
self.executor, self.executor,
lambda: proxy_pool.test_and_clean_proxies(concurrency=20) lambda: proxy_pool.test_and_clean_proxies(concurrency=10)
) )
await self.app.bot.send_message( await self.app.bot.send_message(
chat_id, chat_id,
@@ -6459,7 +6459,7 @@ class ProvisionerBot:
msg = await update.message.reply_text( msg = await update.message.reply_text(
f"🔄 <b>正在测试代理</b>\n\n" f"🔄 <b>正在测试代理</b>\n\n"
f"📊 代理数量: <b>{pool_count}</b>\n" f"📊 代理数量: <b>{pool_count}</b>\n"
f"⚡ 并发数: <b>20</b>\n" f"⚡ 并发数: <b>10</b>\n"
f"🎯 测试目标: <code>api.openai.com</code>\n" f"🎯 测试目标: <code>api.openai.com</code>\n"
f"⏱ 超时: <b>10s</b>\n\n" f"⏱ 超时: <b>10s</b>\n\n"
f"⏳ 测试进行中...", f"⏳ 测试进行中...",
@@ -6469,7 +6469,7 @@ class ProvisionerBot:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
result = await loop.run_in_executor( result = await loop.run_in_executor(
self.executor, self.executor,
lambda: proxy_pool.test_and_clean_proxies(concurrency=20) lambda: proxy_pool.test_and_clean_proxies(concurrency=10)
) )
# 构建详细结果 # 构建详细结果
@@ -7134,7 +7134,7 @@ class ProvisionerBot:
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
test_result = await loop.run_in_executor( test_result = await loop.run_in_executor(
self.executor, self.executor,
lambda: proxy_pool.test_and_clean_proxies(concurrency=20) lambda: proxy_pool.test_and_clean_proxies(concurrency=10)
) )
log.info(f"[Scheduler] 代理池: 存活 {test_result['alive']}/{test_result['total']},移除 {test_result['removed']}") log.info(f"[Scheduler] 代理池: 存活 {test_result['alive']}/{test_result['total']},移除 {test_result['removed']}")