Files
AutoDoneTeam/modules/http_client.py
2026-01-11 10:29:37 +08:00

226 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# modules/http_client.py
"""HTTP 客户端(支持 CSRF token"""
from typing import Dict, Optional, Any
from urllib.parse import unquote, urlparse
try:
from curl_cffi import requests as curl_requests
from curl_cffi.requests import Session as CurlSession
from curl_cffi.requests import Response as CurlResponse
USE_CURL_CFFI = True
ResponseType = CurlResponse
except ImportError:
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.ssl_ import create_urllib3_context
USE_CURL_CFFI = False
ResponseType = requests.Response
print("⚠️ curl_cffi not installed, falling back to requests (may cause 403 errors)")
print(" Install with: pip install curl_cffi")
from .fingerprint import BrowserFingerprint
from config import HTTP_CONFIG, DEBUG
if not USE_CURL_CFFI:
class TLSAdapter(HTTPAdapter):
"""自定义 TLS 适配器"""
def init_poolmanager(self, *args, **kwargs):
ctx = create_urllib3_context()
ctx.set_ciphers(
'TLS_AES_128_GCM_SHA256:'
'TLS_AES_256_GCM_SHA384:'
'TLS_CHACHA20_POLY1305_SHA256:'
'ECDHE-ECDSA-AES128-GCM-SHA256:'
'ECDHE-RSA-AES128-GCM-SHA256'
)
kwargs['ssl_context'] = ctx
return super().init_poolmanager(*args, **kwargs)
class HTTPClient:
"""HTTP 客户端(自动处理 CSRF- 支持 curl_cffi"""
def __init__(self, fingerprint: BrowserFingerprint):
self.fingerprint = fingerprint
self.use_curl = USE_CURL_CFFI
if self.use_curl:
# 使用 curl_cffi 模拟 Chrome 浏览器
impersonate = HTTP_CONFIG.get('impersonate', 'chrome110')
self.session = CurlSession(impersonate=impersonate)
if DEBUG:
print(f"[HTTP] Using curl_cffi with impersonate={impersonate}")
else:
# 降级到 requests可能被检测
self.session = requests.Session()
adapter = TLSAdapter()
self.session.mount('https://', adapter)
if DEBUG:
print("[HTTP] Using fallback requests (may trigger 403)")
# 初始化 cookies
initial_cookies = fingerprint.get_cookies()
for name, value in initial_cookies.items():
self.session.cookies.set(name, value)
self.cookies = self.session.cookies
self.csrf_token = None
# 确保自动解压curl_cffi 自动支持 br/zstd
if not self.use_curl:
self.session.headers.update({
'Accept-Encoding': 'gzip, deflate',
})
def extract_csrf_from_cookies(self, domain: str = "auth.openai.com") -> Optional[str]:
"""从 cookies 中提取 CSRF token按域名"""
# curl_cffi 的 cookies 是字典,直接使用 .get()
# requests 的 cookies 是 CookieJar需要特殊处理
csrf_cookie = None
if hasattr(self.cookies, 'get') and callable(self.cookies.get):
# 字典式访问curl_cffi
csrf_cookie = self.cookies.get('__Host-next-auth.csrf-token')
else:
# CookieJar 访问requests- 尝试带 domain 参数
try:
csrf_cookie = self.cookies.get('__Host-next-auth.csrf-token', domain=domain)
except TypeError:
# 如果不支持 domain 参数,直接获取
csrf_cookie = self.cookies.get('__Host-next-auth.csrf-token')
if not csrf_cookie:
return None
try:
decoded = unquote(csrf_cookie)
token = decoded.split('|')[0]
if DEBUG:
print(f"[HTTP] Extracted CSRF from cookie: {token[:20]}...")
return token
except Exception as e:
if DEBUG:
print(f"[HTTP] Failed to extract CSRF: {e}")
return None
def get(self, url: str, **kwargs) -> Any:
"""GET 请求"""
kwargs.setdefault('headers', self.fingerprint.get_headers())
kwargs.setdefault('cookies', self.cookies)
kwargs.setdefault('timeout', HTTP_CONFIG['timeout'])
if DEBUG:
print(f"[HTTP] GET {url}")
resp = self.session.get(url, **kwargs)
# 尝试提取 CSRF按请求域名
domain = urlparse(url).hostname or "auth.openai.com"
csrf = self.extract_csrf_from_cookies(domain=domain)
if csrf:
self.csrf_token = csrf
if DEBUG:
print(f"[HTTP] Response: {resp.status_code}")
print(f"Cookies: {list(self.cookies.keys())}")
return resp
def get_csrf_token(self, domain: str = "auth.openai.com") -> Optional[str]:
"""
获取 CSRF token
尝试多种方法:
1. 从现有 cookie 提取
2. 访问 /api/auth/csrf
3. 如果都失败,返回 None某些流程可能不需要
"""
# 1. 从 cookie 提取
csrf = self.extract_csrf_from_cookies(domain=domain)
if csrf:
self.csrf_token = csrf
return csrf
# 2. 访问 /api/auth/csrf
if DEBUG:
print("[HTTP] Requesting CSRF token from /api/auth/csrf...")
try:
url = f"https://{domain}/api/auth/csrf"
resp = self.session.get(
url,
headers=self.fingerprint.get_headers(),
cookies=self.cookies,
timeout=HTTP_CONFIG['timeout']
)
# 尝试从 cookie 提取
csrf = self.extract_csrf_from_cookies(domain=domain)
if csrf:
self.csrf_token = csrf
return csrf
# 尝试从 JSON 提取
if resp.status_code == 200:
try:
data = resp.json()
csrf = data.get('csrfToken')
if csrf:
self.csrf_token = csrf
return csrf
except:
pass
except Exception as e:
if DEBUG:
print(f"[HTTP] Failed to get CSRF token: {e}")
# 3. 失败,返回 None
if DEBUG:
print("[HTTP] ⚠ CSRF token not available (may not be required)")
return None
def post(self, url: str, json_data: Optional[Dict] = None,
sentinel_token: Optional[str] = None, **kwargs) -> Any:
"""POST 请求"""
headers = self.fingerprint.get_headers(with_sentinel=sentinel_token)
# 添加 CSRF token如果有
if self.csrf_token:
headers['openai-sentinel-csrf-token'] = self.csrf_token
# 添加必需 headers
headers['Referer'] = 'https://auth.openai.com/'
headers['Origin'] = 'https://auth.openai.com'
kwargs.setdefault('headers', headers)
kwargs.setdefault('cookies', self.cookies)
kwargs.setdefault('timeout', HTTP_CONFIG['timeout'])
if json_data:
kwargs['json'] = json_data
if DEBUG:
print(f"[HTTP] POST {url}")
if self.csrf_token:
print(f"[HTTP] With CSRF token: {self.csrf_token[:20]}...")
if sentinel_token:
print(f"[HTTP] With Sentinel token: {sentinel_token[:50]}...")
resp = self.session.post(url, **kwargs)
self.cookies.update(resp.cookies.get_dict())
if DEBUG:
print(f"[HTTP] Response: {resp.status_code}")
return resp