feat: Add core mail service and proxy pool modules, and integrate them into the bot.

This commit is contained in:
2026-02-13 04:54:27 +08:00
parent d6948591a1
commit e305911e2d
3 changed files with 79 additions and 12 deletions

47
bot.py
View File

@@ -265,15 +265,54 @@ async def cmd_proxytest(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("❌ 代理池为空proxy.txt 不存在或无有效代理)") await update.message.reply_text("❌ 代理池为空proxy.txt 不存在或无有效代理)")
return return
total = pp.count
status_msg = await update.message.reply_text( status_msg = await update.message.reply_text(
f"🔍 正在测试 {pp.count} 个代理...(可能需要一些时间)" f"🔍 正在测试 {total} 个代理...\n"
f"{_progress_bar(0, total)}",
parse_mode="HTML",
) )
# 在线程中执行测试避免阻塞事件循环
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, pp.test_all)
# 构建结果报告 # 在后台线程中运行测试,通过回调更新进度
def _run_test():
ok_count = 0
fail_count = 0
last_update_time = 0
def on_progress(current, total, result):
nonlocal ok_count, fail_count, last_update_time
if result["ok"]:
ok_count += 1
else:
fail_count += 1
# 限制更新频率:最少 2 秒更新一次,或者最后一个时强制更新
now = time.time()
is_last = (current == total)
if not is_last and (now - last_update_time) < 2:
return
last_update_time = now
icon = "" if result["ok"] else ""
latency = f"{result['latency_ms']}ms" if result.get('latency_ms', -1) > 0 else "-"
text = (
f"🔍 <b>代理测试中...</b>\n"
f"{_progress_bar(current, total)}\n\n"
f"✅ 通过: {ok_count} ❌ 失败: {fail_count}\n\n"
f"最新: {icon} <code>{result['proxy']}</code> {latency}"
)
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, text), loop
)
return pp.test_all(progress_callback=on_progress)
results = await loop.run_in_executor(None, _run_test)
# 构建最终结果报告
ok_count = sum(1 for r in results if r["ok"]) ok_count = sum(1 for r in results if r["ok"])
fail_count = len(results) - ok_count fail_count = len(results) - ok_count

View File

@@ -2,9 +2,7 @@ import time
import re import re
import random import random
import threading import threading
import requests as standard_requests # 用于普通API交互 import requests as standard_requests # 用于普通API交互(不走代理,直连邮件服务器)
from config import get_proxy
class MailSystem: class MailSystem:
@@ -35,7 +33,7 @@ class MailSystem:
] ]
} }
try: try:
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15) resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
if resp.json().get('code') == 200: if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}") print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email return full_email
@@ -69,7 +67,7 @@ class MailSystem:
return None return None
try: try:
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15) resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
data = resp.json() data = resp.json()
if resp.status_code in (401, 403): if resp.status_code in (401, 403):
@@ -98,7 +96,7 @@ class MailSystem:
try: try:
url = f"{self.base_url}/api/public/emailList" url = f"{self.base_url}/api/public/emailList"
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1} payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=10) resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10)
if resp.status_code == 200: if resp.status_code == 200:
return {"ok": True, "message": "连接正常"} return {"ok": True, "message": "连接正常"}
elif resp.status_code in (401, 403): elif resp.status_code in (401, 403):

View File

@@ -8,14 +8,18 @@
- 线程安全 - 线程安全
""" """
import logging
import random import random
import threading import threading
import time import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from pathlib import Path from pathlib import Path
from typing import Callable, Optional
import requests as std_requests import requests as std_requests
logger = logging.getLogger(__name__)
# --- 配置常量 --- # --- 配置常量 ---
_PROJECT_ROOT = Path(__file__).parent.parent _PROJECT_ROOT = Path(__file__).parent.parent
@@ -184,6 +188,7 @@ class ProxyPool:
def test_one(self, proxy: Proxy) -> dict: def test_one(self, proxy: Proxy) -> dict:
"""测试单个代理,返回结果 dict""" """测试单个代理,返回结果 dict"""
logger.info(f"🔍 测试代理: {proxy.masked_url}")
proxies = {"http": proxy.url, "https": proxy.url} proxies = {"http": proxy.url, "https": proxy.url}
try: try:
start = time.time() start = time.time()
@@ -202,11 +207,13 @@ class ProxyPool:
proxy.last_test_ok = True proxy.last_test_ok = True
proxy.success_count += 1 proxy.success_count += 1
proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY) proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY)
logger.info(f" ✅ 通过 {proxy.masked_url} | {round(latency)}ms | HTTP {resp.status_code}")
return {"ok": True, "latency_ms": round(latency), "status": resp.status_code} return {"ok": True, "latency_ms": round(latency), "status": resp.status_code}
else: else:
proxy.last_test_ok = False proxy.last_test_ok = False
proxy.fail_count += 1 proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY proxy.priority -= _FAIL_PENALTY
logger.warning(f" ❌ 失败 {proxy.masked_url} | HTTP {resp.status_code}")
return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"} return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"}
except std_requests.exceptions.ConnectTimeout: except std_requests.exceptions.ConnectTimeout:
@@ -214,35 +221,58 @@ class ProxyPool:
proxy.fail_count += 1 proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time() proxy.last_test_time = time.time()
logger.warning(f" ❌ 超时 {proxy.masked_url}")
return {"ok": False, "latency_ms": -1, "error": "连接超时"} return {"ok": False, "latency_ms": -1, "error": "连接超时"}
except std_requests.exceptions.ProxyError as e: except std_requests.exceptions.ProxyError as e:
proxy.last_test_ok = False proxy.last_test_ok = False
proxy.fail_count += 1 proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time() proxy.last_test_time = time.time()
logger.warning(f" ❌ 代理错误 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"} return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"}
except Exception as e: except Exception as e:
proxy.last_test_ok = False proxy.last_test_ok = False
proxy.fail_count += 1 proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time() proxy.last_test_time = time.time()
logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": str(e)} return {"ok": False, "latency_ms": -1, "error": str(e)}
def test_all(self) -> list[dict]: def test_all(self, progress_callback: Optional[Callable] = None) -> list[dict]:
""" """
测试所有代理,返回结果列表。 测试所有代理,返回结果列表。
测试后自动清理优先级过低的代理。 测试后自动清理优先级过低的代理。
Args:
progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None
每测完一个代理后调用,用于更新前端进度。
""" """
results = [] results = []
with self._lock: with self._lock:
proxies_snapshot = list(self._proxies) proxies_snapshot = list(self._proxies)
for proxy in proxies_snapshot: total = len(proxies_snapshot)
logger.info(f"📡 开始批量测试 {total} 个代理...")
for i, proxy in enumerate(proxies_snapshot, 1):
result = self.test_one(proxy) result = self.test_one(proxy)
result["proxy"] = proxy.masked_url result["proxy"] = proxy.masked_url
result["priority"] = proxy.priority result["priority"] = proxy.priority
results.append(result) results.append(result)
if progress_callback:
try:
progress_callback(i, total, result)
except Exception:
pass
ok_count = sum(1 for r in results if r["ok"])
fail_count = total - ok_count
logger.info(
f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | "
f"剩余可用 {self.active_count}"
)
with self._lock: with self._lock:
self._cleanup() self._cleanup()