96 lines
3.7 KiB
Python
96 lines
3.7 KiB
Python
"""Sentinel 反爬机制处理器"""
|
||
|
||
from typing import Optional, Dict, Any
|
||
from utils.logger import logger
|
||
from utils.fingerprint import BrowserFingerprint
|
||
|
||
|
||
class SentinelHandler:
|
||
"""Sentinel 反爬机制处理器"""
|
||
|
||
SENTINEL_API = "https://chatgpt.com/_next/static/chunks/sentinel.js"
|
||
SENTINEL_TOKEN_ENDPOINT = "https://api.openai.com/sentinel/token"
|
||
|
||
def __init__(self, session):
|
||
from core.session import OAISession
|
||
self.session: OAISession = session
|
||
self.oai_did = session.oai_did
|
||
self.fingerprint = BrowserFingerprint(session_id=self.oai_did)
|
||
self._solver = None
|
||
logger.info("SentinelHandler initialized")
|
||
|
||
def _get_solver(self):
|
||
"""延迟初始化 Sentinel 求解器"""
|
||
if self._solver is None:
|
||
try:
|
||
from core.sentinel import SentinelSolver
|
||
self._solver = SentinelSolver(self.fingerprint)
|
||
logger.debug("SentinelSolver initialized")
|
||
except ImportError as e:
|
||
logger.error(f"Failed to import SentinelSolver: {e}")
|
||
raise ImportError("Sentinel solver not found. Check core/sentinel/ directory.") from e
|
||
return self._solver
|
||
|
||
async def get_token(self, flow: str = "username_password_create", **kwargs) -> Dict[str, Any]:
|
||
"""获取 Sentinel Token"""
|
||
logger.info(f"Generating Sentinel token for flow='{flow}'")
|
||
|
||
try:
|
||
solver = self._get_solver()
|
||
token_dict = solver.generate_requirements_token()
|
||
token_dict["flow"] = flow
|
||
token_dict["id"] = self.oai_did
|
||
|
||
if "p" not in token_dict or not token_dict["p"]:
|
||
raise ValueError("Generated token missing 'p' field")
|
||
|
||
logger.success(f"Sentinel token generated: {str(token_dict)[:50]}...")
|
||
return token_dict
|
||
|
||
except ImportError as e:
|
||
logger.error(f"Sentinel solver not available: {e}")
|
||
raise NotImplementedError(f"Sentinel solver import failed: {e}") from e
|
||
|
||
except Exception as e:
|
||
logger.exception(f"Failed to generate Sentinel token: {e}")
|
||
raise RuntimeError(f"Sentinel token generation failed: {e}") from e
|
||
|
||
async def solve_enforcement(self, enforcement_config: Dict[str, Any]) -> str:
|
||
"""解决 enforcement 挑战(PoW + Turnstile)"""
|
||
logger.info("Solving enforcement challenge...")
|
||
|
||
try:
|
||
solver = self._get_solver()
|
||
token_json = solver.solve_enforcement(enforcement_config)
|
||
logger.success(f"Enforcement solved: {token_json[:50]}...")
|
||
return token_json
|
||
|
||
except Exception as e:
|
||
logger.exception(f"Failed to solve enforcement: {e}")
|
||
raise RuntimeError(f"Enforcement solving failed: {e}") from e
|
||
|
||
async def verify_token(self, token: str) -> bool:
|
||
"""验证 Sentinel Token 格式"""
|
||
if not token or token == "placeholder_token":
|
||
return False
|
||
if not token.startswith("gAAAAA"):
|
||
logger.warning(f"Invalid token format: {token[:20]}...")
|
||
return False
|
||
return True
|
||
|
||
def get_sentinel_script(self) -> Optional[str]:
|
||
"""获取 Sentinel JavaScript 代码"""
|
||
try:
|
||
response = self.session.get(self.SENTINEL_API)
|
||
if response.status_code == 200:
|
||
logger.info(f"Sentinel script fetched: {len(response.text)} bytes")
|
||
return response.text
|
||
logger.error(f"Failed to fetch Sentinel script: {response.status_code}")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Error fetching Sentinel script: {e}")
|
||
return None
|
||
|
||
|
||
__all__ = ["SentinelHandler"]
|