Files
autoPlus/core/session.py
2026-01-29 19:16:01 +08:00

369 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
TLS 指纹伪装会话管理模块
核心功能:
- 使用 curl_cffi 模拟 Chrome 浏览器的 TLS 指纹
- 管理关键 Cookie (oai-did, __Secure-next-auth 系列)
- 统一的错误处理 (403 Cloudflare 拦截, 409 会话冲突)
- 代理支持
"""
from curl_cffi import requests
from typing import Optional, Dict, Any
from utils.crypto import generate_oai_did
from utils.logger import logger
class CloudflareBlockError(Exception):
"""Cloudflare 拦截异常403 + Turnstile 挑战)"""
pass
class SessionInvalidError(Exception):
"""会话失效异常409 Conflict - CSRF Token 断链)"""
pass
class RateLimitError(Exception):
"""速率限制异常429 Too Many Requests"""
pass
class OAISession:
"""
OpenAI 会话管理器
使用 curl_cffi 库模拟真实 Chrome 浏览器的 TLS 指纹,绕过 OpenAI 的检测
关键特性:
- TLS 指纹伪装 (impersonate="chrome124")
- oai-did Cookie 管理(设备指纹)
- 自动错误检测和异常抛出
- 代理支持HTTP/HTTPS/SOCKS5
- 登录获取 access_token
"""
# OpenAI 相关域名
CHATGPT_DOMAIN = "chatgpt.com"
AUTH_DOMAIN = "auth.openai.com"
API_DOMAIN = "api.openai.com"
def __init__(self, proxy: Optional[str] = None, impersonate: str = "chrome124"):
"""
初始化会话
参数:
proxy: 代理地址,支持格式:
- HTTP: "http://user:pass@ip:port"
- HTTPS: "https://user:pass@ip:port"
- SOCKS5: "socks5://user:pass@ip:port"
impersonate: 模拟的浏览器版本,可选值:
- "chrome110", "chrome120", "chrome124" (推荐)
- 需要根据实际情况测试最佳版本
"""
# 创建 curl_cffi 会话(核心!)
self.client = requests.Session(
impersonate=impersonate,
timeout=30
)
# 配置代理
if proxy:
self.client.proxies = {
"http": proxy,
"https": proxy
}
logger.info(f"Session using proxy: {self._mask_proxy(proxy)}")
else:
logger.info("Session initialized without proxy")
# 设置请求头(从真实浏览器抓包)
self._setup_headers()
# 生成并设置 oai-did Cookie关键设备指纹
self.oai_did = generate_oai_did()
self.client.cookies.set(
"oai-did",
self.oai_did,
domain=f".{self.CHATGPT_DOMAIN}"
)
logger.info(f"Session initialized with oai-did: {self.oai_did}")
# 登录状态
self.access_token: Optional[str] = None
self.session_token: Optional[str] = None
self.logged_in_email: Optional[str] = None
def _setup_headers(self):
"""
设置 HTTP 请求头,模拟真实 Chrome 浏览器
这些 Header 从开发文档的抓包日志中提取
"""
self.client.headers.update({
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"Accept": "application/json, text/plain, */*",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Accept-Encoding": "gzip, deflate, br",
"Sec-Ch-Ua": '"Chromium";v="143", "Not.A/Brand";v="24"',
"Sec-Ch-Ua-Mobile": "?0",
"Sec-Ch-Ua-Platform": '"Linux"',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"DNT": "1",
"Connection": "keep-alive",
})
def get(self, url: str, **kwargs) -> requests.Response:
"""
发送 GET 请求
参数:
url: 目标 URL
**kwargs: 传递给 requests.get 的其他参数
返回:
Response 对象
抛出:
CloudflareBlockError: 遇到 Cloudflare 拦截
SessionInvalidError: 会话失效409
RateLimitError: 速率限制429
"""
try:
response = self.client.get(url, **kwargs)
return self._handle_response(response, url)
except Exception as e:
logger.error(f"GET request failed: {url} - {e}")
raise
def post(self, url: str, params=None, **kwargs) -> requests.Response:
"""
发送 POST 请求
参数:
url: 目标 URL
params: URL 查询参数(可选)
**kwargs: 传递给 requests.post 的其他参数
返回:
Response 对象
抛出:
CloudflareBlockError: 遇到 Cloudflare 拦截
SessionInvalidError: 会话失效409
RateLimitError: 速率限制429
"""
try:
response = self.client.post(url, params=params, **kwargs)
return self._handle_response(response, url)
except Exception as e:
logger.error(f"POST request failed: {url} - {e}")
raise
def _handle_response(self, response: requests.Response, url: str) -> requests.Response:
"""
统一响应处理和错误检测
参数:
response: curl_cffi 响应对象
url: 请求的 URL用于日志
返回:
原始 Response 对象(如果没有错误)
抛出:
CloudflareBlockError: 检测到 Cloudflare 挑战页面
SessionInvalidError: 检测到 409 会话冲突
RateLimitError: 检测到 429 速率限制
"""
status_code = response.status_code
# 检测 Cloudflare Turnstile 挑战403 + 特征文本)
if status_code == 403:
if self._is_cloudflare_challenge(response):
logger.error(f"Cloudflare challenge detected: {url}")
raise CloudflareBlockError(
f"Cloudflare Turnstile challenge triggered at {url}. "
"Possible solutions: use residential proxy, solve captcha, or retry later."
)
# 检测会话冲突CSRF Token 失效)
if status_code == 409:
logger.error(f"Session conflict (409): {url} - {response.text[:200]}")
raise SessionInvalidError(
f"Session invalid (409 Conflict): {response.text[:200]}. "
"This usually means CSRF token expired or cookie chain broken. "
"Need to restart registration flow."
)
# 检测速率限制
if status_code == 429:
logger.error(f"Rate limit exceeded (429): {url}")
raise RateLimitError(
f"Rate limit exceeded at {url}. "
"Recommendation: slow down requests or change IP/proxy."
)
# 记录其他错误响应4xx, 5xx
if status_code >= 400:
logger.warning(
f"HTTP {status_code} error: {url}\n"
f"Response preview: {response.text[:300]}"
)
# 记录成功响应(调试用)
if status_code < 300:
logger.debug(f"HTTP {status_code} OK: {url}")
return response
@staticmethod
def _is_cloudflare_challenge(response: requests.Response) -> bool:
"""
检测响应是否为 Cloudflare Turnstile 挑战页面
特征:
- 状态码 403
- 包含 "Just a moment""Checking your browser" 等文本
- 包含 Cloudflare 相关 JavaScript
"""
body = response.text.lower()
cloudflare_keywords = [
"just a moment",
"checking your browser",
"cloudflare",
"cf-challenge",
"ray id"
]
return any(keyword in body for keyword in cloudflare_keywords)
@staticmethod
def _mask_proxy(proxy: str) -> str:
"""
脱敏代理地址(隐藏用户名和密码)
例如: http://user:pass@1.2.3.4:8080 -> http://***:***@1.2.3.4:8080
"""
import re
return re.sub(r'://([^:]+):([^@]+)@', r'://***:***@', proxy)
def get_cookies(self) -> Dict[str, str]:
"""
获取当前所有 Cookie
返回:
Cookie 字典 {name: value}
"""
# curl_cffi 的 cookies 可能存在同名不同域的 cookie需要遍历 jar
result = {}
try:
for cookie in self.client.cookies.jar:
# 用 domain:name 作为 key 避免冲突,或者直接覆盖
result[cookie.name] = cookie.value
except Exception:
# 兼容处理
try:
for cookie in self.client.cookies:
result[cookie.name] = cookie.value
except Exception:
pass
return result
def get_cookie(self, name: str) -> Optional[str]:
"""
获取指定名称的 Cookie 值
参数:
name: Cookie 名称
返回:
Cookie 值,不存在则返回 None
"""
return self.client.cookies.get(name)
def set_cookie(self, name: str, value: str, domain: str = None):
"""
设置 Cookie
参数:
name: Cookie 名称
value: Cookie 值
domain: Cookie 作用域(默认 .chatgpt.com
"""
if domain is None:
domain = f".{self.CHATGPT_DOMAIN}"
self.client.cookies.set(name, value, domain=domain)
logger.debug(f"Cookie set: {name}={value[:10]}... (domain={domain})")
def close(self):
"""关闭会话,释放资源"""
try:
self.client.close()
logger.debug("Session closed")
except Exception as e:
logger.warning(f"Error closing session: {e}")
async def login(self, email: str, password: str) -> Dict[str, Any]:
"""
使用邮箱密码登录,获取 access_token
参数:
email: 登录邮箱
password: 登录密码
返回:
登录结果字典,包含:
- status: 状态 (success/failed/...)
- access_token: 访问令牌(成功时)
- session_token: 会话令牌(成功时)
- error: 错误信息(失败时)
示例:
session = OAISession()
result = await session.login("user@example.com", "password123")
if result["status"] == "success":
print(f"Access Token: {result['access_token']}")
"""
from core.login_flow import LoginFlow
flow = LoginFlow(self, email, password)
result = await flow.run()
# 保存登录状态
if result.get("status") == "success":
self.access_token = result.get("access_token")
self.session_token = result.get("session_token")
self.logged_in_email = email
logger.info(f"Session logged in as: {email}")
return result
def is_logged_in(self) -> bool:
"""检查是否已登录"""
return self.access_token is not None
def get_access_token(self) -> Optional[str]:
"""获取当前的 access_token"""
return self.access_token
def __enter__(self):
"""支持 with 语句上下文管理"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""退出上下文时自动关闭"""
self.close()
# 导出主要接口
__all__ = [
"OAISession",
"CloudflareBlockError",
"SessionInvalidError",
"RateLimitError",
]