feat: Enhance proxy testing to return detailed results including latency and errors, and display a comprehensive, formatted report in the Telegram bot.

This commit is contained in:
2026-02-11 02:18:24 +08:00
parent be8dd745fb
commit 660d43161d
2 changed files with 125 additions and 36 deletions

View File

@@ -127,7 +127,7 @@ def save_proxies(proxies: list[str]):
print(f"[ProxyPool] 保存代理文件失败: {e}")
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
"""测试单个代理是否可用
Args:
@@ -135,13 +135,13 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
timeout: 超时秒数
Returns:
bool: 代理是否可用
dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str}
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
start = time.time()
try:
if CURL_AVAILABLE:
# 使用 curl_cffi (更好的指纹)
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
@@ -150,17 +150,18 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
impersonate="edge",
)
else:
# 回退到 requests
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
# 任何响应都算成功 (包括 401/403说明代理本身是通的)
return True
except Exception:
return False
latency = int((time.time() - start) * 1000)
return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""}
except Exception as e:
latency = int((time.time() - start) * 1000)
err_msg = str(e)[:50]
return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg}
class ProxyPool:
@@ -192,12 +193,12 @@ class ProxyPool:
timeout: 单个代理超时秒数
Returns:
dict: {"total": int, "alive": int, "removed": int, "duration": float}
dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0}
self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []}
return self._last_test_results
total = len(all_proxies)
@@ -205,7 +206,7 @@ class ProxyPool:
# 并发测试
alive_proxies = []
dead_proxies = []
details = [] # 每个代理的详细结果
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
@@ -216,31 +217,37 @@ class ProxyPool:
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
is_alive = future.result()
if is_alive:
result = future.result()
details.append(result)
if result["alive"]:
alive_proxies.append(proxy)
else:
dead_proxies.append(proxy)
except Exception:
dead_proxies.append(proxy)
except Exception as e:
details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]})
# 按原始顺序排序 details
proxy_order = {p: i for i, p in enumerate(all_proxies)}
details.sort(key=lambda d: proxy_order.get(d["proxy"], 999))
duration = time.time() - start_time
# 更新工作代理池
# 更新工作代理池 (保持原始顺序)
ordered_alive = [p for p in all_proxies if p in set(alive_proxies)]
with self._lock:
self._working_proxies = alive_proxies
self._working_proxies = ordered_alive
self._index = 0
# 保存存活的代理到文件 (移除死亡代理)
if dead_proxies:
save_proxies(alive_proxies)
# 保存存活的代理到文件
dead_count = total - len(ordered_alive)
if dead_count > 0:
save_proxies(ordered_alive)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
"alive": len(alive_proxies),
"removed": len(dead_proxies),
"alive": len(ordered_alive),
"removed": dead_count,
"duration": round(duration, 1),
"details": details,
}
return self._last_test_results

View File

@@ -6458,9 +6458,11 @@ class ProvisionerBot:
msg = await update.message.reply_text(
f"🔄 <b>正在测试代理</b>\n\n"
f"代理数量: {pool_count}\n"
f"并发: 20\n"
f"请稍候...",
f"📊 代理数量: <b>{pool_count}</b>\n"
f"并发: <b>20</b>\n"
f"🎯 测试目标: <code>api.openai.com</code>\n"
f"⏱ 超时: <b>10s</b>\n\n"
f"⏳ 测试进行中...",
parse_mode="HTML"
)
@@ -6470,21 +6472,101 @@ class ProvisionerBot:
lambda: proxy_pool.test_and_clean_proxies(concurrency=20)
)
await msg.edit_text(
f"✅ <b>代理测试完成</b>\n\n"
f"总计: {result['total']}\n"
f"存活: {result['alive']}\n"
f"移除: {result['removed']}\n"
f"耗时: {result['duration']}s\n\n"
f"{'💡 不可用代理已从 proxy.txt 中移除' if result['removed'] > 0 else '🎉 所有代理均可用'}",
parse_mode="HTML"
# 构建详细结果
details = result.get("details", [])
total = result["total"]
alive = result["alive"]
removed = result["removed"]
duration = result["duration"]
# 脱敏函数
def mask_proxy(proxy_url):
if "@" in proxy_url:
parts = proxy_url.split("@")
scheme_auth = parts[0]
host_part = parts[-1]
if "://" in scheme_auth:
scheme = scheme_auth.split("://")[0]
return f"{scheme}://***@{host_part}"
return f"***@{host_part}"
return proxy_url
# 延迟等级图标
def latency_icon(ms):
if ms < 500:
return "🟢"
elif ms < 1500:
return "🟡"
elif ms < 3000:
return "🟠"
else:
return "🔴"
# 构建每个代理的结果行
lines = [f"<b>🌐 代理池测试报告</b>\n"]
# 统计信息
alive_rate = (alive / total * 100) if total > 0 else 0
alive_latencies = [d["latency_ms"] for d in details if d["alive"]]
avg_latency = int(sum(alive_latencies) / len(alive_latencies)) if alive_latencies else 0
min_latency = min(alive_latencies) if alive_latencies else 0
max_latency = max(alive_latencies) if alive_latencies else 0
lines.append(f"┌─────────────────────")
lines.append(f"│ 📊 总计: <b>{total}</b> ✅ 存活: <b>{alive}</b> ❌ 移除: <b>{removed}</b>")
lines.append(f"│ 📈 存活率: <b>{alive_rate:.0f}%</b> ⏱ 耗时: <b>{duration}s</b>")
if alive > 0:
lines.append(f"│ ⚡ 延迟: 平均 <b>{avg_latency}ms</b> / 最快 <b>{min_latency}ms</b> / 最慢 <b>{max_latency}ms</b>")
lines.append(f"└─────────────────────\n")
# 每个代理的详细结果 (最多显示50个)
show_details = details[:50]
for i, d in enumerate(show_details, 1):
masked = mask_proxy(d["proxy"])
if d["alive"]:
icon = latency_icon(d["latency_ms"])
lines.append(
f"{icon} <code>{masked}</code>\n"
f"{d['latency_ms']}ms"
)
else:
lines.append(
f"❌ <code>{masked}</code>\n"
f" 💀 超时/失败"
)
if len(details) > 50:
lines.append(f"\n... 还有 {len(details) - 50} 个未显示")
# 底部提示
lines.append("")
if removed > 0:
lines.append(f"🗑 已从 <code>proxy.txt</code> 移除 {removed} 个不可用代理")
else:
lines.append("🎉 所有代理均可用!")
text = "\n".join(lines)
# Telegram 消息长度限制 4096
if len(text) > 4000:
# 截断详情,只保留摘要
summary_lines = lines[:8] # 头部统计
summary_lines.append(f"\n(详情过多已省略,共 {total} 个代理)")
summary_lines.append("")
if removed > 0:
summary_lines.append(f"🗑 已从 proxy.txt 移除 {removed} 个不可用代理")
else:
summary_lines.append("🎉 所有代理均可用!")
text = "\n".join(summary_lines)
await msg.edit_text(text, parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ proxy_pool 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 代理测试失败: {e}")
@admin_only
async def cmd_proxy_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""从 proxy.txt 重新加载代理"""