diff --git a/bot.py b/bot.py index 7052e82..5cd6cf6 100644 --- a/bot.py +++ b/bot.py @@ -265,15 +265,54 @@ async def cmd_proxytest(update: Update, context: ContextTypes.DEFAULT_TYPE): await update.message.reply_text("❌ 代理池为空(proxy.txt 不存在或无有效代理)") return + total = pp.count status_msg = await update.message.reply_text( - f"🔍 正在测试 {pp.count} 个代理...(可能需要一些时间)" + f"🔍 正在测试 {total} 个代理...\n" + f"{_progress_bar(0, total)}", + parse_mode="HTML", ) - # 在线程中执行测试避免阻塞事件循环 loop = asyncio.get_event_loop() - results = await loop.run_in_executor(None, pp.test_all) - # 构建结果报告 + # 在后台线程中运行测试,通过回调更新进度 + def _run_test(): + ok_count = 0 + fail_count = 0 + last_update_time = 0 + + def on_progress(current, total, result): + nonlocal ok_count, fail_count, last_update_time + if result["ok"]: + ok_count += 1 + else: + fail_count += 1 + + # 限制更新频率:最少 2 秒更新一次,或者最后一个时强制更新 + now = time.time() + is_last = (current == total) + if not is_last and (now - last_update_time) < 2: + return + last_update_time = now + + icon = "✅" if result["ok"] else "❌" + latency = f"{result['latency_ms']}ms" if result.get('latency_ms', -1) > 0 else "-" + + text = ( + f"🔍 代理测试中...\n" + f"{_progress_bar(current, total)}\n\n" + f"✅ 通过: {ok_count} ❌ 失败: {fail_count}\n\n" + f"最新: {icon} {result['proxy']} {latency}" + ) + + asyncio.run_coroutine_threadsafe( + _edit_or_send(status_msg, text), loop + ) + + return pp.test_all(progress_callback=on_progress) + + results = await loop.run_in_executor(None, _run_test) + + # 构建最终结果报告 ok_count = sum(1 for r in results if r["ok"]) fail_count = len(results) - ok_count diff --git a/core/mail_service.py b/core/mail_service.py index d8b721a..d09ddca 100644 --- a/core/mail_service.py +++ b/core/mail_service.py @@ -2,9 +2,7 @@ import time import re import random import threading -import requests as standard_requests # 用于普通API交互 - -from config import get_proxy +import requests as standard_requests # 用于普通API交互(不走代理,直连邮件服务器) class MailSystem: @@ -35,7 +33,7 @@ class MailSystem: ] } try: - resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15) + resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15) if resp.json().get('code') == 200: print(f"[+] 邮箱用户创建成功: {full_email}") return full_email @@ -69,7 +67,7 @@ class MailSystem: return None try: - resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15) + resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15) data = resp.json() if resp.status_code in (401, 403): @@ -98,7 +96,7 @@ class MailSystem: try: url = f"{self.base_url}/api/public/emailList" payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1} - resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=10) + resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10) if resp.status_code == 200: return {"ok": True, "message": "连接正常"} elif resp.status_code in (401, 403): diff --git a/core/proxy_pool.py b/core/proxy_pool.py index 289fa13..f0f2c43 100644 --- a/core/proxy_pool.py +++ b/core/proxy_pool.py @@ -8,14 +8,18 @@ - 线程安全 """ +import logging import random import threading import time from dataclasses import dataclass, field from pathlib import Path +from typing import Callable, Optional import requests as std_requests +logger = logging.getLogger(__name__) + # --- 配置常量 --- _PROJECT_ROOT = Path(__file__).parent.parent @@ -184,6 +188,7 @@ class ProxyPool: def test_one(self, proxy: Proxy) -> dict: """测试单个代理,返回结果 dict""" + logger.info(f"🔍 测试代理: {proxy.masked_url}") proxies = {"http": proxy.url, "https": proxy.url} try: start = time.time() @@ -202,11 +207,13 @@ class ProxyPool: proxy.last_test_ok = True proxy.success_count += 1 proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY) + logger.info(f" ✅ 通过 {proxy.masked_url} | {round(latency)}ms | HTTP {resp.status_code}") return {"ok": True, "latency_ms": round(latency), "status": resp.status_code} else: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY + logger.warning(f" ❌ 失败 {proxy.masked_url} | HTTP {resp.status_code}") return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"} except std_requests.exceptions.ConnectTimeout: @@ -214,35 +221,58 @@ class ProxyPool: proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() + logger.warning(f" ❌ 超时 {proxy.masked_url}") return {"ok": False, "latency_ms": -1, "error": "连接超时"} except std_requests.exceptions.ProxyError as e: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() + logger.warning(f" ❌ 代理错误 {proxy.masked_url}: {e}") return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"} except Exception as e: proxy.last_test_ok = False proxy.fail_count += 1 proxy.priority -= _FAIL_PENALTY proxy.last_test_time = time.time() + logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}") return {"ok": False, "latency_ms": -1, "error": str(e)} - def test_all(self) -> list[dict]: + def test_all(self, progress_callback: Optional[Callable] = None) -> list[dict]: """ 测试所有代理,返回结果列表。 测试后自动清理优先级过低的代理。 + + Args: + progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None + 每测完一个代理后调用,用于更新前端进度。 """ results = [] with self._lock: proxies_snapshot = list(self._proxies) - for proxy in proxies_snapshot: + total = len(proxies_snapshot) + logger.info(f"📡 开始批量测试 {total} 个代理...") + + for i, proxy in enumerate(proxies_snapshot, 1): result = self.test_one(proxy) result["proxy"] = proxy.masked_url result["priority"] = proxy.priority results.append(result) + if progress_callback: + try: + progress_callback(i, total, result) + except Exception: + pass + + ok_count = sum(1 for r in results if r["ok"]) + fail_count = total - ok_count + logger.info( + f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | " + f"剩余可用 {self.active_count}" + ) + with self._lock: self._cleanup()