From 660d43161dac0faba717a6f4e6385e683030d1cd Mon Sep 17 00:00:00 2001 From: kyx236 Date: Wed, 11 Feb 2026 02:18:24 +0800 Subject: [PATCH] feat: Enhance proxy testing to return detailed results including latency and errors, and display a comprehensive, formatted report in the Telegram bot. --- proxy_pool.py | 55 ++++++++++++++----------- telegram_bot.py | 106 ++++++++++++++++++++++++++++++++++++++++++------ 2 files changed, 125 insertions(+), 36 deletions(-) diff --git a/proxy_pool.py b/proxy_pool.py index b4ce1d5..a0d1a34 100644 --- a/proxy_pool.py +++ b/proxy_pool.py @@ -127,7 +127,7 @@ def save_proxies(proxies: list[str]): print(f"[ProxyPool] 保存代理文件失败: {e}") -def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool: +def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> dict: """测试单个代理是否可用 Args: @@ -135,13 +135,13 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool: timeout: 超时秒数 Returns: - bool: 代理是否可用 + dict: {"proxy": str, "alive": bool, "latency_ms": int, "error": str} """ proxies_dict = {"http": proxy_url, "https": proxy_url} + start = time.time() try: if CURL_AVAILABLE: - # 使用 curl_cffi (更好的指纹) resp = curl_requests.head( TEST_URL, proxies=proxies_dict, @@ -150,17 +150,18 @@ def test_single_proxy(proxy_url: str, timeout: int = TEST_TIMEOUT) -> bool: impersonate="edge", ) else: - # 回退到 requests resp = requests.head( TEST_URL, proxies=proxies_dict, timeout=timeout, verify=False, ) - # 任何响应都算成功 (包括 401/403,说明代理本身是通的) - return True - except Exception: - return False + latency = int((time.time() - start) * 1000) + return {"proxy": proxy_url, "alive": True, "latency_ms": latency, "error": ""} + except Exception as e: + latency = int((time.time() - start) * 1000) + err_msg = str(e)[:50] + return {"proxy": proxy_url, "alive": False, "latency_ms": latency, "error": err_msg} class ProxyPool: @@ -192,12 +193,12 @@ class ProxyPool: timeout: 单个代理超时秒数 Returns: - dict: {"total": int, "alive": int, "removed": int, "duration": float} + dict: {"total": int, "alive": int, "removed": int, "duration": float, "details": list} """ # 先从文件加载最新 all_proxies = load_proxies() if not all_proxies: - self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0} + self._last_test_results = {"total": 0, "alive": 0, "removed": 0, "duration": 0, "details": []} return self._last_test_results total = len(all_proxies) @@ -205,7 +206,7 @@ class ProxyPool: # 并发测试 alive_proxies = [] - dead_proxies = [] + details = [] # 每个代理的详细结果 with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: future_to_proxy = { @@ -216,31 +217,37 @@ class ProxyPool: for future in concurrent.futures.as_completed(future_to_proxy): proxy = future_to_proxy[future] try: - is_alive = future.result() - if is_alive: + result = future.result() + details.append(result) + if result["alive"]: alive_proxies.append(proxy) - else: - dead_proxies.append(proxy) - except Exception: - dead_proxies.append(proxy) + except Exception as e: + details.append({"proxy": proxy, "alive": False, "latency_ms": 0, "error": str(e)[:50]}) + + # 按原始顺序排序 details + proxy_order = {p: i for i, p in enumerate(all_proxies)} + details.sort(key=lambda d: proxy_order.get(d["proxy"], 999)) duration = time.time() - start_time - # 更新工作代理池 + # 更新工作代理池 (保持原始顺序) + ordered_alive = [p for p in all_proxies if p in set(alive_proxies)] with self._lock: - self._working_proxies = alive_proxies + self._working_proxies = ordered_alive self._index = 0 - # 保存存活的代理到文件 (移除死亡代理) - if dead_proxies: - save_proxies(alive_proxies) + # 保存存活的代理到文件 + dead_count = total - len(ordered_alive) + if dead_count > 0: + save_proxies(ordered_alive) self._last_test_time = time.time() self._last_test_results = { "total": total, - "alive": len(alive_proxies), - "removed": len(dead_proxies), + "alive": len(ordered_alive), + "removed": dead_count, "duration": round(duration, 1), + "details": details, } return self._last_test_results diff --git a/telegram_bot.py b/telegram_bot.py index 557944e..87af921 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -6458,9 +6458,11 @@ class ProvisionerBot: msg = await update.message.reply_text( f"🔄 正在测试代理\n\n" - f"代理数量: {pool_count}\n" - f"并发: 20\n" - f"请稍候...", + f"📊 代理数量: {pool_count}\n" + f"⚡ 并发数: 20\n" + f"🎯 测试目标: api.openai.com\n" + f"⏱ 超时: 10s\n\n" + f"⏳ 测试进行中...", parse_mode="HTML" ) @@ -6470,21 +6472,101 @@ class ProvisionerBot: lambda: proxy_pool.test_and_clean_proxies(concurrency=20) ) - await msg.edit_text( - f"✅ 代理测试完成\n\n" - f"总计: {result['total']}\n" - f"存活: {result['alive']} ✅\n" - f"移除: {result['removed']} ❌\n" - f"耗时: {result['duration']}s\n\n" - f"{'💡 不可用代理已从 proxy.txt 中移除' if result['removed'] > 0 else '🎉 所有代理均可用'}", - parse_mode="HTML" - ) + # 构建详细结果 + details = result.get("details", []) + total = result["total"] + alive = result["alive"] + removed = result["removed"] + duration = result["duration"] + + # 脱敏函数 + def mask_proxy(proxy_url): + if "@" in proxy_url: + parts = proxy_url.split("@") + scheme_auth = parts[0] + host_part = parts[-1] + if "://" in scheme_auth: + scheme = scheme_auth.split("://")[0] + return f"{scheme}://***@{host_part}" + return f"***@{host_part}" + return proxy_url + + # 延迟等级图标 + def latency_icon(ms): + if ms < 500: + return "🟢" + elif ms < 1500: + return "🟡" + elif ms < 3000: + return "🟠" + else: + return "🔴" + + # 构建每个代理的结果行 + lines = [f"🌐 代理池测试报告\n"] + + # 统计信息 + alive_rate = (alive / total * 100) if total > 0 else 0 + alive_latencies = [d["latency_ms"] for d in details if d["alive"]] + avg_latency = int(sum(alive_latencies) / len(alive_latencies)) if alive_latencies else 0 + min_latency = min(alive_latencies) if alive_latencies else 0 + max_latency = max(alive_latencies) if alive_latencies else 0 + + lines.append(f"┌─────────────────────") + lines.append(f"│ 📊 总计: {total} ✅ 存活: {alive} ❌ 移除: {removed}") + lines.append(f"│ 📈 存活率: {alive_rate:.0f}% ⏱ 耗时: {duration}s") + if alive > 0: + lines.append(f"│ ⚡ 延迟: 平均 {avg_latency}ms / 最快 {min_latency}ms / 最慢 {max_latency}ms") + lines.append(f"└─────────────────────\n") + + # 每个代理的详细结果 (最多显示50个) + show_details = details[:50] + for i, d in enumerate(show_details, 1): + masked = mask_proxy(d["proxy"]) + if d["alive"]: + icon = latency_icon(d["latency_ms"]) + lines.append( + f"{icon} {masked}\n" + f" ✅ {d['latency_ms']}ms" + ) + else: + lines.append( + f"❌ {masked}\n" + f" 💀 超时/失败" + ) + + if len(details) > 50: + lines.append(f"\n... 还有 {len(details) - 50} 个未显示") + + # 底部提示 + lines.append("") + if removed > 0: + lines.append(f"🗑 已从 proxy.txt 移除 {removed} 个不可用代理") + else: + lines.append("🎉 所有代理均可用!") + + text = "\n".join(lines) + + # Telegram 消息长度限制 4096 + if len(text) > 4000: + # 截断详情,只保留摘要 + summary_lines = lines[:8] # 头部统计 + summary_lines.append(f"\n(详情过多已省略,共 {total} 个代理)") + summary_lines.append("") + if removed > 0: + summary_lines.append(f"🗑 已从 proxy.txt 移除 {removed} 个不可用代理") + else: + summary_lines.append("🎉 所有代理均可用!") + text = "\n".join(summary_lines) + + await msg.edit_text(text, parse_mode="HTML") except ImportError: await update.message.reply_text("❌ proxy_pool 模块未找到") except Exception as e: await update.message.reply_text(f"❌ 代理测试失败: {e}") + @admin_only async def cmd_proxy_reload(self, update: Update, context: ContextTypes.DEFAULT_TYPE): """从 proxy.txt 重新加载代理"""