"""JavaScript 执行引擎封装 说明: - OpenAI 的 `assets/sdk.js` 是浏览器脚本,包含多段 anti-debug 代码与 DOM 初始化逻辑。 - 直接用 PyExecJS/ExecJS 在 Node 环境执行时,常见表现是「compile 成功但 call 卡死」。 - 这里改为通过 `node -` 运行一段自包含脚本:先做轻量净化 + browser shim,再执行目标函数并强制输出结果。 """ from __future__ import annotations import json import re import subprocess from pathlib import Path from typing import Any, Dict, Optional from reference.config import DEBUG, SDK_JS_PATH class JSExecutor: """通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM)""" def __init__(self): self._sdk_code: str = "" self._load_sdk() def _load_sdk(self) -> None: sdk_path = Path(SDK_JS_PATH) if not sdk_path.exists(): raise FileNotFoundError(f"SDK not found at {SDK_JS_PATH}") sdk_code = sdk_path.read_text(encoding="utf-8") sdk_code = self._sanitize_sdk(sdk_code) sdk_code = self._inject_internal_exports(sdk_code) self._sdk_code = sdk_code if DEBUG: print("[JSExecutor] SDK loaded successfully (sanitized)") def _sanitize_sdk(self, sdk_code: str) -> str: """移除会在 Node 环境中导致卡死/超慢的 anti-debug 片段。""" # 1) 删除少量已知的顶层 anti-debug 直接调用(独占一行) sdk_code = re.sub(r"(?m)^\s*[rugkU]\(\);\s*$", "", sdk_code) sdk_code = re.sub(r"(?m)^\s*o\(\);\s*$", "", sdk_code) # 2) 删除 `Pt(),` 这种逗号表达式里的 anti-debug 调用(避免语法破坏) sdk_code = re.sub(r"\bPt\(\),\s*", "", sdk_code) sdk_code = re.sub(r"\bPt\(\);\s*", "", sdk_code) # 3) 删除 class 字段初始化里的 anti-debug 调用:`return n(), "" + Math.random();` sdk_code = re.sub( r'return\s+n\(\),\s*""\s*\+\s*Math\.random\(\)\s*;', 'return "" + Math.random();', sdk_code, ) # 4) 删除类似 `if ((e(), cond))` 的逗号 anti-debug 调用(保留 cond) # 仅处理极短标识符,避免误伤正常逻辑;保留 Turnstile VM 的 `vt()`。 def _strip_comma_call(match: re.Match[str]) -> str: fn = match.group(1) if fn == "vt": return match.group(0) return "(" sdk_code = re.sub( r"\(\s*([A-Za-z_$][A-Za-z0-9_$]{0,2})\(\)\s*,", _strip_comma_call, sdk_code, ) return sdk_code def _inject_internal_exports(self, sdk_code: str) -> str: """把 SDK 内部对象导出到 `SentinelSDK` 上,便于在外部调用。""" # SDK 末尾一般是: # (t.init = un), # (t.token = an), # t # ); pattern = re.compile( r"\(\s*t\.init\s*=\s*un\s*\)\s*,\s*\(\s*t\.token\s*=\s*an\s*\)\s*,\s*t\s*\)", re.MULTILINE, ) replacement = ( "(t.init = un)," "(t.token = an)," "(t.__O = O)," "(t.__P = P)," "(t.__bt = bt)," "(t.__kt = kt)," "(t.__Kt = Kt)," "t)" ) new_code, n = pattern.subn(replacement, sdk_code, count=1) if n != 1: raise RuntimeError("Failed to patch SDK exports; SDK format may have changed.") return new_code def _node_script(self, payload: Dict[str, Any], entry: str) -> str: payload_json = json.dumps(payload, ensure_ascii=False) shim = r""" // --- minimal browser shims for Node --- if (typeof globalThis.window !== "object") globalThis.window = globalThis; if (!window.top) window.top = window; if (!window.location) window.location = { href: "https://auth.openai.com/create-account/password", search: "", pathname: "/create-account/password", origin: "https://auth.openai.com" }; if (!window.addEventListener) window.addEventListener = function(){}; if (!window.removeEventListener) window.removeEventListener = function(){}; if (!window.postMessage) window.postMessage = function(){}; if (!window.__sentinel_token_pending) window.__sentinel_token_pending = []; if (!window.__sentinel_init_pending) window.__sentinel_init_pending = []; if (typeof globalThis.document !== "object") globalThis.document = {}; if (!document.scripts) document.scripts = []; if (!document.cookie) document.cookie = ""; if (!document.documentElement) document.documentElement = { getAttribute: () => null }; if (!document.currentScript) document.currentScript = null; if (!document.body) document.body = { appendChild: function(){}, getAttribute: () => null }; if (!document.createElement) document.createElement = function(tag){ return { tagName: String(tag||"").toUpperCase(), style: {}, setAttribute: function(){}, getAttribute: function(){ return null; }, addEventListener: function(){}, removeEventListener: function(){}, src: "", contentWindow: { postMessage: function(){}, addEventListener: function(){}, removeEventListener: function(){} }, }; }; if (typeof globalThis.navigator !== "object") globalThis.navigator = { userAgent: "ua", language: "en-US", languages: ["en-US","en"], hardwareConcurrency: 8 }; if (typeof globalThis.screen !== "object") globalThis.screen = { width: 1920, height: 1080 }; if (typeof globalThis.btoa !== "function") globalThis.btoa = (str) => Buffer.from(str, "binary").toString("base64"); if (typeof globalThis.atob !== "function") globalThis.atob = (b64) => Buffer.from(b64, "base64").toString("binary"); window.btoa = globalThis.btoa; window.atob = globalThis.atob; """ wrapper = f""" const __payload = {payload_json}; function __makeSolver(configArray) {{ const solver = new SentinelSDK.__O(); solver.sid = configArray?.[14]; // 强制使用 Python 传入的 configArray,避免依赖真实浏览器对象 solver.getConfig = () => configArray; return solver; }} async function __entry() {{ {entry} }} (async () => {{ try {{ const result = await __entry(); process.stdout.write(JSON.stringify({{ ok: true, result }}), () => process.exit(0)); }} catch (err) {{ const msg = (err && (err.stack || err.message)) ? (err.stack || err.message) : String(err); process.stdout.write(JSON.stringify({{ ok: false, error: msg }}), () => process.exit(1)); }} }})(); """ return "\n".join([shim, self._sdk_code, wrapper]) def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any: script = self._node_script(payload, entry) if DEBUG: print("[JSExecutor] Running Node worker...") try: proc = subprocess.run( ["node", "-"], input=script, text=True, capture_output=True, timeout=timeout_s, ) except FileNotFoundError as e: raise RuntimeError("Node.js not found on PATH (required for Sentinel SDK execution).") from e except subprocess.TimeoutExpired as e: raise TimeoutError(f"Node worker timed out after {timeout_s}s") from e stdout = (proc.stdout or "").strip() if not stdout: raise RuntimeError(f"Node worker produced no output (stderr={proc.stderr!r})") try: obj = json.loads(stdout) except json.JSONDecodeError as e: raise RuntimeError(f"Node worker returned non-JSON output: {stdout[:200]!r}") from e if not obj.get("ok"): raise RuntimeError(obj.get("error", "Unknown JS error")) return obj.get("result") def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str: if DEBUG: print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}") result = self._run_node( {"seed": seed, "difficulty": difficulty, "configArray": config_array}, entry="return __makeSolver(__payload.configArray)._generateAnswerSync(__payload.seed, __payload.difficulty);", timeout_s=60, ) if DEBUG and isinstance(result, str): print(f"[JSExecutor] PoW solved: {result[:50]}...") return result def generate_requirements(self, seed: str, config_array: list) -> str: result = self._run_node( {"seed": seed, "configArray": config_array}, entry=( "const solver = __makeSolver(__payload.configArray);\n" "solver.requirementsSeed = __payload.seed;\n" "return solver._generateRequirementsTokenAnswerBlocking();" ), timeout_s=30, ) return result def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str: if DEBUG: print("[JSExecutor] Executing Turnstile VM...") result = self._run_node( {"dx": dx_bytecode, "xorKey": xor_key}, entry=( "SentinelSDK.__kt(__payload.xorKey);\n" "return await SentinelSDK.__bt(__payload.dx);" ), timeout_s=30, ) if DEBUG and isinstance(result, str): print(f"[JSExecutor] Turnstile result: {result[:50]}...") return result