175 lines
5.9 KiB
Python
175 lines
5.9 KiB
Python
# modules/sentinel_solver.py
|
||
"""Sentinel 挑战求解器
|
||
|
||
支持两种模式:
|
||
1. Node.js 模式 (默认): 使用 sdk.js 执行
|
||
2. Native 模式: 纯 Python 实现,无需 Node.js
|
||
"""
|
||
|
||
import json
|
||
import uuid
|
||
from typing import Dict, Optional
|
||
|
||
from .fingerprint import BrowserFingerprint
|
||
from config import DEBUG
|
||
|
||
# 尝试导入 JSExecutor(需要 Node.js)
|
||
try:
|
||
from .js_executor import JSExecutor
|
||
HAS_JS_EXECUTOR = True
|
||
except Exception:
|
||
HAS_JS_EXECUTOR = False
|
||
|
||
# 尝试导入纯 Python 实现
|
||
try:
|
||
from .sentinel_native import NativeSentinelSolver, PowSolver
|
||
HAS_NATIVE_SOLVER = True
|
||
except ImportError:
|
||
HAS_NATIVE_SOLVER = False
|
||
|
||
|
||
class SentinelSolver:
|
||
"""协调指纹生成和求解器,生成完整的 Sentinel tokens
|
||
|
||
支持两种模式:
|
||
- use_native=False (默认): 使用 Node.js + sdk.js
|
||
- use_native=True: 使用纯 Python 实现
|
||
"""
|
||
|
||
def __init__(self, fingerprint: BrowserFingerprint, use_native: bool = False):
|
||
self.fingerprint = fingerprint
|
||
self.use_native = use_native
|
||
|
||
if use_native:
|
||
if not HAS_NATIVE_SOLVER:
|
||
raise ImportError("Native solver not available. Install esprima: pip install esprima")
|
||
self.native_solver = NativeSentinelSolver(fingerprint.session_id)
|
||
self.js_executor = None
|
||
if DEBUG:
|
||
print("[Solver] Using native Python solver")
|
||
else:
|
||
if not HAS_JS_EXECUTOR:
|
||
raise ImportError("JSExecutor not available. Check Node.js installation or use use_native=True")
|
||
self.js_executor = JSExecutor()
|
||
self.native_solver = None
|
||
if DEBUG:
|
||
print("[Solver] Using Node.js executor")
|
||
|
||
def set_context(self, prod_version: str, ip_info: list = None):
|
||
"""设置上下文(仅 native 模式需要)"""
|
||
if self.use_native and self.native_solver:
|
||
self.native_solver.set_context(prod_version, ip_info)
|
||
|
||
def generate_requirements_token(self) -> Dict[str, str]:
|
||
"""
|
||
生成 requirements token(初始化时需要)
|
||
|
||
Returns:
|
||
{'p': 'gAAAAAC...', 'id': 'uuid'}
|
||
"""
|
||
if DEBUG:
|
||
print("[Solver] Generating requirements token...")
|
||
|
||
if self.use_native:
|
||
# 纯 Python 模式
|
||
answer = self.native_solver.generate_requirements_token()
|
||
# native 模式返回完整 token,需要去掉前缀
|
||
if answer.startswith('gAAAAAC'):
|
||
answer = answer[7:] # 移除 'gAAAAAC' 前缀
|
||
else:
|
||
# Node.js 模式
|
||
req_seed = str(uuid.uuid4())
|
||
config_array = self.fingerprint.get_config_array()
|
||
answer = self.js_executor.generate_requirements(req_seed, config_array)
|
||
|
||
token = {
|
||
'p': f'gAAAAAC{answer}',
|
||
'id': self.fingerprint.session_id,
|
||
}
|
||
|
||
if DEBUG:
|
||
print(f"[Solver] Requirements token: {token['p'][:30]}...")
|
||
|
||
return token
|
||
|
||
def solve_enforcement(self, enforcement_config: Dict, p_token: str = None) -> str:
|
||
"""
|
||
解决完整的 enforcement 挑战(PoW + Turnstile)
|
||
|
||
Args:
|
||
enforcement_config: 服务器返回的挑战配置
|
||
{
|
||
'proofofwork': {
|
||
'seed': '...',
|
||
'difficulty': '0003a',
|
||
'token': '...', # cached token
|
||
'turnstile': {
|
||
'dx': '...' # VM bytecode
|
||
}
|
||
}
|
||
}
|
||
p_token: requirements token (用于 Turnstile,仅 native 模式需要)
|
||
|
||
Returns:
|
||
完整的 Sentinel token (JSON string)
|
||
"""
|
||
if DEBUG:
|
||
print("[Solver] Solving enforcement challenge...")
|
||
|
||
pow_data = enforcement_config.get('proofofwork', {})
|
||
|
||
# 1. 解决 PoW
|
||
seed = pow_data['seed']
|
||
difficulty = pow_data['difficulty']
|
||
|
||
if self.use_native:
|
||
# 纯 Python 模式
|
||
pow_answer = self.native_solver.solve_pow(seed, difficulty)
|
||
# native 模式返回完整 token,需要去掉前缀
|
||
if pow_answer.startswith('gAAAAAB'):
|
||
pow_answer = pow_answer[7:] # 移除 'gAAAAAB' 前缀
|
||
else:
|
||
# Node.js 模式
|
||
config_array = self.fingerprint.get_config_array()
|
||
pow_answer = self.js_executor.solve_pow(seed, difficulty, config_array)
|
||
|
||
# 2. 执行 Turnstile(如果有)
|
||
turnstile_result = None
|
||
turnstile_data = pow_data.get('turnstile')
|
||
|
||
if turnstile_data and turnstile_data.get('dx'):
|
||
dx_bytecode = turnstile_data['dx']
|
||
|
||
if self.use_native:
|
||
# 纯 Python 模式
|
||
if p_token:
|
||
turnstile_result = self.native_solver.solve_turnstile(dx_bytecode, p_token)
|
||
else:
|
||
# Node.js 模式
|
||
xor_key = self.fingerprint.session_id
|
||
turnstile_result = self.js_executor.execute_turnstile(dx_bytecode, xor_key)
|
||
|
||
# 3. 构建最终 token
|
||
sentinel_token = {
|
||
# enforcement token 前缀为 gAAAAAB(requirements 为 gAAAAAC)
|
||
'p': f'gAAAAAB{pow_answer}',
|
||
'id': self.fingerprint.session_id,
|
||
'flow': 'username_password_create',
|
||
}
|
||
|
||
# 添加可选字段
|
||
if turnstile_result:
|
||
sentinel_token['t'] = turnstile_result
|
||
|
||
if pow_data.get('token'):
|
||
sentinel_token['c'] = pow_data['token']
|
||
|
||
token_json = json.dumps(sentinel_token)
|
||
|
||
if DEBUG:
|
||
print(f"[Solver] Sentinel token generated: {token_json[:80]}...")
|
||
|
||
return token_json
|
||
|
||
|