diff --git a/proxy_pool.py b/proxy_pool.py
index b4ce1d5..a0d1a34 100644
--- a/proxy_pool.py
+++ b/proxy_pool.py
@@ -127,7 +127,7 @@ def save_proxies(proxies: list[str]):
print(f"[ProxyPool] 保存代理文件失败: {e}")
-def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
+def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict:
"""测试单个代理是否可用
Args:
@@ -135,13 +135,13 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
timeout: 超时秒数
Returns:
- bool: 代理是否可用
+ dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str}
"""
proxies_dict = {"http": proxy_url, "https": proxy_url}
+ start = time.time()
try:
if CURL_AVAILABLE:
- # 使用 curl_cffi (更好的指纹)
resp = curl_requests.head(
TEST_URL,
proxies=proxies_dict,
@@ -150,17 +150,18 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool:
impersonate="edge",
)
else:
- # 回退到 requests
resp = requests.head(
TEST_URL,
proxies=proxies_dict,
timeout=timeout,
verify=False,
)
- # 任何响应都算成功 (包括 401/403,说明代理本身是通的)
- return True
- except Exception:
- return False
+ latency = int((time.time() - start) * 1000)
+ return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""}
+ except Exception as e:
+ latency = int((time.time() - start) * 1000)
+ err_msg = str(e)[:50]
+ return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg}
class ProxyPool:
@@ -192,12 +193,12 @@ class ProxyPool:
timeout: 单个代理超时秒数
Returns:
- dict: {"total": int, "alive": int, "removed": int, "duration": float}
+ dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list}
"""
# 先从文件加载最新
all_proxies = load_proxies()
if not all_proxies:
- self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0}
+ self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []}
return self._last_test_results
total = len(all_proxies)
@@ -205,7 +206,7 @@ class ProxyPool:
# 并发测试
alive_proxies = []
- dead_proxies = []
+ details = [] # 每个代理的详细结果
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_proxy = {
@@ -216,31 +217,37 @@ class ProxyPool:
for future in concurrent.futures.as_completed(future_to_proxy):
proxy = future_to_proxy[future]
try:
- is_alive = future.result()
- if is_alive:
+ result = future.result()
+ details.append(result)
+ if result["alive"]:
alive_proxies.append(proxy)
- else:
- dead_proxies.append(proxy)
- except Exception:
- dead_proxies.append(proxy)
+ except Exception as e:
+ details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]})
+
+ # 按原始顺序排序 details
+ proxy_order = {p: i for i, p in enumerate(all_proxies)}
+ details.sort(key=lambda d: proxy_order.get(d["proxy"], 999))
duration = time.time() - start_time
- # 更新工作代理池
+ # 更新工作代理池 (保持原始顺序)
+ ordered_alive = [p for p in all_proxies if p in set(alive_proxies)]
with self._lock:
- self._working_proxies = alive_proxies
+ self._working_proxies = ordered_alive
self._index = 0
- # 保存存活的代理到文件 (移除死亡代理)
- if dead_proxies:
- save_proxies(alive_proxies)
+ # 保存存活的代理到文件
+ dead_count = total - len(ordered_alive)
+ if dead_count > 0:
+ save_proxies(ordered_alive)
self._last_test_time = time.time()
self._last_test_results = {
"total": total,
- "alive": len(alive_proxies),
- "removed": len(dead_proxies),
+ "alive": len(ordered_alive),
+ "removed": dead_count,
"duration": round(duration, 1),
+ "details": details,
}
return self._last_test_results
diff --git a/telegram_bot.py b/telegram_bot.py
index 557944e..87af921 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -6458,9 +6458,11 @@ class ProvisionerBot:
msg = await update.message.reply_text(
f"🔄 正在测试代理\n\n"
- f"代理数量: {pool_count}\n"
- f"并发: 20\n"
- f"请稍候...",
+ f"📊 代理数量: {pool_count}\n"
+ f"⚡ 并发数: 20\n"
+ f"🎯 测试目标: api.openai.com\n"
+ f"⏱ 超时: 10s\n\n"
+ f"⏳ 测试进行中...",
parse_mode="HTML"
)
@@ -6470,21 +6472,101 @@ class ProvisionerBot:
lambda: proxy_pool.test_and_clean_proxies(concurrency=20)
)
- await msg.edit_text(
- f"✅ 代理测试完成\n\n"
- f"总计: {result['total']}\n"
- f"存活: {result['alive']} ✅\n"
- f"移除: {result['removed']} ❌\n"
- f"耗时: {result['duration']}s\n\n"
- f"{'💡 不可用代理已从 proxy.txt 中移除' if result['removed'] > 0 else '🎉 所有代理均可用'}",
- parse_mode="HTML"
- )
+ # 构建详细结果
+ details = result.get("details", [])
+ total = result["total"]
+ alive = result["alive"]
+ removed = result["removed"]
+ duration = result["duration"]
+
+ # 脱敏函数
+ def mask_proxy(proxy_url):
+ if "@" in proxy_url:
+ parts = proxy_url.split("@")
+ scheme_auth = parts[0]
+ host_part = parts[-1]
+ if "://" in scheme_auth:
+ scheme = scheme_auth.split("://")[0]
+ return f"{scheme}://***@{host_part}"
+ return f"***@{host_part}"
+ return proxy_url
+
+ # 延迟等级图标
+ def latency_icon(ms):
+ if ms < 500:
+ return "🟢"
+ elif ms < 1500:
+ return "🟡"
+ elif ms < 3000:
+ return "🟠"
+ else:
+ return "🔴"
+
+ # 构建每个代理的结果行
+ lines = [f"🌐 代理池测试报告\n"]
+
+ # 统计信息
+ alive_rate = (alive / total * 100) if total > 0 else 0
+ alive_latencies = [d["latency_ms"] for d in details if d["alive"]]
+ avg_latency = int(sum(alive_latencies) / len(alive_latencies)) if alive_latencies else 0
+ min_latency = min(alive_latencies) if alive_latencies else 0
+ max_latency = max(alive_latencies) if alive_latencies else 0
+
+ lines.append(f"┌─────────────────────")
+ lines.append(f"│ 📊 总计: {total} ✅ 存活: {alive} ❌ 移除: {removed}")
+ lines.append(f"│ 📈 存活率: {alive_rate:.0f}% ⏱ 耗时: {duration}s")
+ if alive > 0:
+ lines.append(f"│ ⚡ 延迟: 平均 {avg_latency}ms / 最快 {min_latency}ms / 最慢 {max_latency}ms")
+ lines.append(f"└─────────────────────\n")
+
+ # 每个代理的详细结果 (最多显示50个)
+ show_details = details[:50]
+ for i, d in enumerate(show_details, 1):
+ masked = mask_proxy(d["proxy"])
+ if d["alive"]:
+ icon = latency_icon(d["latency_ms"])
+ lines.append(
+ f"{icon} {masked}\n"
+ f" ✅ {d['latency_ms']}ms"
+ )
+ else:
+ lines.append(
+ f"❌ {masked}\n"
+ f" 💀 超时/失败"
+ )
+
+ if len(details) > 50:
+ lines.append(f"\n... 还有 {len(details) - 50} 个未显示")
+
+ # 底部提示
+ lines.append("")
+ if removed > 0:
+ lines.append(f"🗑 已从 proxy.txt 移除 {removed} 个不可用代理")
+ else:
+ lines.append("🎉 所有代理均可用!")
+
+ text = "\n".join(lines)
+
+ # Telegram 消息长度限制 4096
+ if len(text) > 4000:
+ # 截断详情,只保留摘要
+ summary_lines = lines[:8] # 头部统计
+ summary_lines.append(f"\n(详情过多已省略,共 {total} 个代理)")
+ summary_lines.append("")
+ if removed > 0:
+ summary_lines.append(f"🗑 已从 proxy.txt 移除 {removed} 个不可用代理")
+ else:
+ summary_lines.append("🎉 所有代理均可用!")
+ text = "\n".join(summary_lines)
+
+ await msg.edit_text(text, parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ proxy_pool 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 代理测试失败: {e}")
+
@admin_only
async def cmd_proxy_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""从 proxy.txt 重新加载代理"""