From a23b7e8d273cbd91eb077aa5c2e5fbb8374880ee Mon Sep 17 00:00:00 2001 From: kyx236 Date: Sat, 14 Feb 2026 02:52:07 +0800 Subject: [PATCH] feat: Implement proxy pool management and integrate it into the bot. --- bot.py | 83 +++++++++++++++++++++++++++++++++++----------- core/proxy_pool.py | 43 +++++++++++++++++------- 2 files changed, 95 insertions(+), 31 deletions(-) diff --git a/bot.py b/bot.py index 4ef9321..c7b2af8 100644 --- a/bot.py +++ b/bot.py @@ -660,35 +660,80 @@ async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE): from curl_cffi import requests as cffi_requests from config import get_proxy - results = [] - for i, acc in enumerate(accounts, 1): - sk = acc.get("session_key", "") - email = acc.get("email", "?") - if not sk: - results.append({"email": email, "ok": False, "reason": "SK 为空"}) - continue + _VERIFY_URL = "https://claude.ai/api/organizations" + _MAX_RETRIES = 2 # 代理重试次数 + def _do_verify_request(sk: str, proxies: dict, timeout: int = 15): + """发送验证请求,返回 (status_code, error_str)""" try: resp = cffi_requests.get( - "https://claude.ai/api/organizations", + _VERIFY_URL, headers={ "Cookie": f"sessionKey={sk}", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36", }, impersonate="chrome124", - proxies=get_proxy(), - timeout=15, + proxies=proxies, + timeout=timeout, ) - if resp.status_code == 200: - results.append({"email": email, "ok": True, "reason": "有效"}) - elif resp.status_code == 401: - results.append({"email": email, "ok": False, "reason": "已过期"}) - elif resp.status_code == 403: - results.append({"email": email, "ok": False, "reason": "被封禁"}) - else: - results.append({"email": email, "ok": False, "reason": f"HTTP {resp.status_code}"}) + return resp.status_code, None except Exception as e: - results.append({"email": email, "ok": False, "reason": str(e)[:50]}) + return None, str(e)[:80] + + results = [] + total = len(accounts) + for i, acc in enumerate(accounts, 1): + sk = acc.get("session_key", "") + email = acc.get("email", "?") + + # 更新进度 + bar = _progress_bar(i - 1, total) + await _edit_or_send( + status_msg, + f"🔍 验证中... {bar}\n" + f"⏳ {email}", + ) + + if not sk: + results.append({"email": email, "ok": False, "reason": "SK 为空"}) + continue + + status_code = None + last_error = None + used_direct = False + + # 阶段 1:通过代理尝试(最多 _MAX_RETRIES 次) + proxy = get_proxy() + if proxy: + for attempt in range(_MAX_RETRIES): + status_code, err = _do_verify_request(sk, proxy) + if status_code is not None: + break + last_error = err + logger.warning(f"验证重试 {attempt + 1}/{_MAX_RETRIES} ({email}): {err}") + time.sleep(1) + + # 阶段 2:代理全部失败 → 直连回退 + if status_code is None: + logger.info(f"代理失败,直连回退验证: {email}") + status_code, err = _do_verify_request(sk, {}, timeout=20) + if status_code is None: + last_error = err + else: + used_direct = True + + # 解析结果 + if status_code is None: + results.append({"email": email, "ok": False, "reason": f"网络错误: {last_error}"}) + elif status_code == 200: + suffix = " (直连)" if used_direct else "" + results.append({"email": email, "ok": True, "reason": f"有效{suffix}"}) + elif status_code == 401: + results.append({"email": email, "ok": False, "reason": "已过期"}) + elif status_code == 403: + results.append({"email": email, "ok": False, "reason": "被封禁"}) + else: + results.append({"email": email, "ok": False, "reason": f"HTTP {status_code}"}) valid = sum(1 for r in results if r["ok"]) invalid = len(results) - valid diff --git a/core/proxy_pool.py b/core/proxy_pool.py index f0f2c43..32ed0e5 100644 --- a/core/proxy_pool.py +++ b/core/proxy_pool.py @@ -238,35 +238,54 @@ class ProxyPool: logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}") return {"ok": False, "latency_ms": -1, "error": str(e)} - def test_all(self, progress_callback: Optional[Callable] = None) -> list[dict]: + def test_all(self, progress_callback: Optional[Callable] = None, max_workers: int = 5) -> list[dict]: """ - 测试所有代理,返回结果列表。 + 并发测试所有代理,返回结果列表。 测试后自动清理优先级过低的代理。 Args: progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None 每测完一个代理后调用,用于更新前端进度。 + max_workers: 并发测试线程数,默认 5。 """ - results = [] + from concurrent.futures import ThreadPoolExecutor, as_completed + with self._lock: proxies_snapshot = list(self._proxies) total = len(proxies_snapshot) - logger.info(f"📡 开始批量测试 {total} 个代理...") + logger.info(f"📡 开始并发测试 {total} 个代理({max_workers} 并发)...") - for i, proxy in enumerate(proxies_snapshot, 1): + results = [None] * total # 保持顺序 + completed = [0] # 用列表以便在闭包中修改 + results_lock = threading.Lock() + + def _test_proxy(index, proxy): result = self.test_one(proxy) result["proxy"] = proxy.masked_url result["priority"] = proxy.priority - results.append(result) + return index, result - if progress_callback: - try: - progress_callback(i, total, result) - except Exception: - pass + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_test_proxy, i, proxy): i + for i, proxy in enumerate(proxies_snapshot) + } + for future in as_completed(futures): + idx, result = future.result() + results[idx] = result - ok_count = sum(1 for r in results if r["ok"]) + with results_lock: + completed[0] += 1 + current = completed[0] + + if progress_callback: + try: + progress_callback(current, total, result) + except Exception: + pass + + ok_count = sum(1 for r in results if r and r["ok"]) fail_count = total - ok_count logger.info( f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | "