feat: Implement proxy pool management and integrate it into the bot.

This commit is contained in:
2026-02-14 02:52:07 +08:00
parent 768963666a
commit a23b7e8d27
2 changed files with 95 additions and 31 deletions

79
bot.py
View File

@@ -660,35 +660,80 @@ async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
from curl_cffi import requests as cffi_requests
from config import get_proxy
results = []
for i, acc in enumerate(accounts, 1):
sk = acc.get("session_key", "")
email = acc.get("email", "?")
if not sk:
results.append({"email": email, "ok": False, "reason": "SK 为空"})
continue
_VERIFY_URL = "https://claude.ai/api/organizations"
_MAX_RETRIES = 2 # 代理重试次数
def _do_verify_request(sk: str, proxies: dict, timeout: int = 15):
"""发送验证请求,返回 (status_code, error_str)"""
try:
resp = cffi_requests.get(
"https://claude.ai/api/organizations",
_VERIFY_URL,
headers={
"Cookie": f"sessionKey={sk}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
},
impersonate="chrome124",
proxies=get_proxy(),
timeout=15,
proxies=proxies,
timeout=timeout,
)
if resp.status_code == 200:
results.append({"email": email, "ok": True, "reason": "有效"})
elif resp.status_code == 401:
return resp.status_code, None
except Exception as e:
return None, str(e)[:80]
results = []
total = len(accounts)
for i, acc in enumerate(accounts, 1):
sk = acc.get("session_key", "")
email = acc.get("email", "?")
# 更新进度
bar = _progress_bar(i - 1, total)
await _edit_or_send(
status_msg,
f"🔍 验证中... {bar}\n"
f"{email}",
)
if not sk:
results.append({"email": email, "ok": False, "reason": "SK 为空"})
continue
status_code = None
last_error = None
used_direct = False
# 阶段 1通过代理尝试最多 _MAX_RETRIES 次)
proxy = get_proxy()
if proxy:
for attempt in range(_MAX_RETRIES):
status_code, err = _do_verify_request(sk, proxy)
if status_code is not None:
break
last_error = err
logger.warning(f"验证重试 {attempt + 1}/{_MAX_RETRIES} ({email}): {err}")
time.sleep(1)
# 阶段 2代理全部失败 → 直连回退
if status_code is None:
logger.info(f"代理失败,直连回退验证: {email}")
status_code, err = _do_verify_request(sk, {}, timeout=20)
if status_code is None:
last_error = err
else:
used_direct = True
# 解析结果
if status_code is None:
results.append({"email": email, "ok": False, "reason": f"网络错误: {last_error}"})
elif status_code == 200:
suffix = " (直连)" if used_direct else ""
results.append({"email": email, "ok": True, "reason": f"有效{suffix}"})
elif status_code == 401:
results.append({"email": email, "ok": False, "reason": "已过期"})
elif resp.status_code == 403:
elif status_code == 403:
results.append({"email": email, "ok": False, "reason": "被封禁"})
else:
results.append({"email": email, "ok": False, "reason": f"HTTP {resp.status_code}"})
except Exception as e:
results.append({"email": email, "ok": False, "reason": str(e)[:50]})
results.append({"email": email, "ok": False, "reason": f"HTTP {status_code}"})
valid = sum(1 for r in results if r["ok"])
invalid = len(results) - valid

View File

@@ -238,35 +238,54 @@ class ProxyPool:
logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": str(e)}
def test_all(self, progress_callback: Optional[Callable] = None) -> list[dict]:
def test_all(self, progress_callback: Optional[Callable] = None, max_workers: int = 5) -> list[dict]:
"""
测试所有代理,返回结果列表。
并发测试所有代理,返回结果列表。
测试后自动清理优先级过低的代理。
Args:
progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None
每测完一个代理后调用,用于更新前端进度。
max_workers: 并发测试线程数,默认 5。
"""
results = []
from concurrent.futures import ThreadPoolExecutor, as_completed
with self._lock:
proxies_snapshot = list(self._proxies)
total = len(proxies_snapshot)
logger.info(f"📡 开始批量测试 {total} 个代理...")
logger.info(f"📡 开始并发测试 {total} 个代理{max_workers} 并发)...")
for i, proxy in enumerate(proxies_snapshot, 1):
results = [None] * total # 保持顺序
completed = [0] # 用列表以便在闭包中修改
results_lock = threading.Lock()
def _test_proxy(index, proxy):
result = self.test_one(proxy)
result["proxy"] = proxy.masked_url
result["priority"] = proxy.priority
results.append(result)
return index, result
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_test_proxy, i, proxy): i
for i, proxy in enumerate(proxies_snapshot)
}
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
with results_lock:
completed[0] += 1
current = completed[0]
if progress_callback:
try:
progress_callback(i, total, result)
progress_callback(current, total, result)
except Exception:
pass
ok_count = sum(1 for r in results if r["ok"])
ok_count = sum(1 for r in results if r and r["ok"])
fail_count = total - ok_count
logger.info(
f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | "