246 lines
9.2 KiB
Python
246 lines
9.2 KiB
Python
"""JavaScript 执行引擎封装
|
||
|
||
说明:
|
||
- OpenAI 的 `assets/sdk.js` 是浏览器脚本,包含多段 anti-debug 代码与 DOM 初始化逻辑。
|
||
- 直接用 PyExecJS/ExecJS 在 Node 环境执行时,常见表现是「compile 成功但 call 卡死」。
|
||
- 这里改为通过 `node -` 运行一段自包含脚本:先做轻量净化 + browser shim,再执行目标函数并强制输出结果。
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import re
|
||
import subprocess
|
||
from pathlib import Path
|
||
from typing import Any, Dict, Optional
|
||
|
||
from reference.config import DEBUG, SDK_JS_PATH
|
||
|
||
|
||
class JSExecutor:
|
||
"""通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM)"""
|
||
|
||
def __init__(self):
|
||
self._sdk_code: str = ""
|
||
self._load_sdk()
|
||
|
||
def _load_sdk(self) -> None:
|
||
sdk_path = Path(SDK_JS_PATH)
|
||
if not sdk_path.exists():
|
||
raise FileNotFoundError(f"SDK not found at {SDK_JS_PATH}")
|
||
|
||
sdk_code = sdk_path.read_text(encoding="utf-8")
|
||
sdk_code = self._sanitize_sdk(sdk_code)
|
||
sdk_code = self._inject_internal_exports(sdk_code)
|
||
|
||
self._sdk_code = sdk_code
|
||
|
||
if DEBUG:
|
||
print("[JSExecutor] SDK loaded successfully (sanitized)")
|
||
|
||
def _sanitize_sdk(self, sdk_code: str) -> str:
|
||
"""移除会在 Node 环境中导致卡死/超慢的 anti-debug 片段。"""
|
||
# 1) 删除少量已知的顶层 anti-debug 直接调用(独占一行)
|
||
sdk_code = re.sub(r"(?m)^\s*[rugkU]\(\);\s*$", "", sdk_code)
|
||
sdk_code = re.sub(r"(?m)^\s*o\(\);\s*$", "", sdk_code)
|
||
|
||
# 2) 删除 `Pt(),` 这种逗号表达式里的 anti-debug 调用(避免语法破坏)
|
||
sdk_code = re.sub(r"\bPt\(\),\s*", "", sdk_code)
|
||
sdk_code = re.sub(r"\bPt\(\);\s*", "", sdk_code)
|
||
|
||
# 3) 删除 class 字段初始化里的 anti-debug 调用:`return n(), "" + Math.random();`
|
||
sdk_code = re.sub(
|
||
r'return\s+n\(\),\s*""\s*\+\s*Math\.random\(\)\s*;',
|
||
'return "" + Math.random();',
|
||
sdk_code,
|
||
)
|
||
|
||
# 4) 删除类似 `if ((e(), cond))` 的逗号 anti-debug 调用(保留 cond)
|
||
# 仅处理极短标识符,避免误伤正常逻辑;保留 Turnstile VM 的 `vt()`。
|
||
def _strip_comma_call(match: re.Match[str]) -> str:
|
||
fn = match.group(1)
|
||
if fn == "vt":
|
||
return match.group(0)
|
||
return "("
|
||
|
||
sdk_code = re.sub(
|
||
r"\(\s*([A-Za-z_$][A-Za-z0-9_$]{0,2})\(\)\s*,",
|
||
_strip_comma_call,
|
||
sdk_code,
|
||
)
|
||
return sdk_code
|
||
|
||
def _inject_internal_exports(self, sdk_code: str) -> str:
|
||
"""把 SDK 内部对象导出到 `SentinelSDK` 上,便于在外部调用。"""
|
||
# SDK 末尾一般是:
|
||
# (t.init = un),
|
||
# (t.token = an),
|
||
# t
|
||
# );
|
||
pattern = re.compile(
|
||
r"\(\s*t\.init\s*=\s*un\s*\)\s*,\s*\(\s*t\.token\s*=\s*an\s*\)\s*,\s*t\s*\)",
|
||
re.MULTILINE,
|
||
)
|
||
|
||
replacement = (
|
||
"(t.init = un),"
|
||
"(t.token = an),"
|
||
"(t.__O = O),"
|
||
"(t.__P = P),"
|
||
"(t.__bt = bt),"
|
||
"(t.__kt = kt),"
|
||
"(t.__Kt = Kt),"
|
||
"t)"
|
||
)
|
||
|
||
new_code, n = pattern.subn(replacement, sdk_code, count=1)
|
||
if n != 1:
|
||
raise RuntimeError("Failed to patch SDK exports; SDK format may have changed.")
|
||
return new_code
|
||
|
||
def _node_script(self, payload: Dict[str, Any], entry: str) -> str:
|
||
payload_json = json.dumps(payload, ensure_ascii=False)
|
||
|
||
shim = r"""
|
||
// --- minimal browser shims for Node ---
|
||
if (typeof globalThis.window !== "object") globalThis.window = globalThis;
|
||
if (!window.top) window.top = window;
|
||
if (!window.location) window.location = { href: "https://auth.openai.com/create-account/password", search: "", pathname: "/create-account/password", origin: "https://auth.openai.com" };
|
||
if (!window.addEventListener) window.addEventListener = function(){};
|
||
if (!window.removeEventListener) window.removeEventListener = function(){};
|
||
if (!window.postMessage) window.postMessage = function(){};
|
||
if (!window.__sentinel_token_pending) window.__sentinel_token_pending = [];
|
||
if (!window.__sentinel_init_pending) window.__sentinel_init_pending = [];
|
||
|
||
if (typeof globalThis.document !== "object") globalThis.document = {};
|
||
if (!document.scripts) document.scripts = [];
|
||
if (!document.cookie) document.cookie = "";
|
||
if (!document.documentElement) document.documentElement = { getAttribute: () => null };
|
||
if (!document.currentScript) document.currentScript = null;
|
||
if (!document.body) document.body = { appendChild: function(){}, getAttribute: () => null };
|
||
if (!document.createElement) document.createElement = function(tag){
|
||
return {
|
||
tagName: String(tag||"").toUpperCase(),
|
||
style: {},
|
||
setAttribute: function(){},
|
||
getAttribute: function(){ return null; },
|
||
addEventListener: function(){},
|
||
removeEventListener: function(){},
|
||
src: "",
|
||
contentWindow: { postMessage: function(){}, addEventListener: function(){}, removeEventListener: function(){} },
|
||
};
|
||
};
|
||
|
||
if (typeof globalThis.navigator !== "object") globalThis.navigator = { userAgent: "ua", language: "en-US", languages: ["en-US","en"], hardwareConcurrency: 8 };
|
||
if (typeof globalThis.screen !== "object") globalThis.screen = { width: 1920, height: 1080 };
|
||
if (typeof globalThis.btoa !== "function") globalThis.btoa = (str) => Buffer.from(str, "binary").toString("base64");
|
||
if (typeof globalThis.atob !== "function") globalThis.atob = (b64) => Buffer.from(b64, "base64").toString("binary");
|
||
window.btoa = globalThis.btoa;
|
||
window.atob = globalThis.atob;
|
||
"""
|
||
|
||
wrapper = f"""
|
||
const __payload = {payload_json};
|
||
|
||
function __makeSolver(configArray) {{
|
||
const solver = new SentinelSDK.__O();
|
||
solver.sid = configArray?.[14];
|
||
// 强制使用 Python 传入的 configArray,避免依赖真实浏览器对象
|
||
solver.getConfig = () => configArray;
|
||
return solver;
|
||
}}
|
||
|
||
async function __entry() {{
|
||
{entry}
|
||
}}
|
||
|
||
(async () => {{
|
||
try {{
|
||
const result = await __entry();
|
||
process.stdout.write(JSON.stringify({{ ok: true, result }}), () => process.exit(0));
|
||
}} catch (err) {{
|
||
const msg = (err && (err.stack || err.message)) ? (err.stack || err.message) : String(err);
|
||
process.stdout.write(JSON.stringify({{ ok: false, error: msg }}), () => process.exit(1));
|
||
}}
|
||
}})();
|
||
"""
|
||
return "\n".join([shim, self._sdk_code, wrapper])
|
||
|
||
def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any:
|
||
script = self._node_script(payload, entry)
|
||
|
||
if DEBUG:
|
||
print("[JSExecutor] Running Node worker...")
|
||
|
||
try:
|
||
proc = subprocess.run(
|
||
["node", "-"],
|
||
input=script,
|
||
text=True,
|
||
capture_output=True,
|
||
timeout=timeout_s,
|
||
)
|
||
except FileNotFoundError as e:
|
||
raise RuntimeError("Node.js not found on PATH (required for Sentinel SDK execution).") from e
|
||
except subprocess.TimeoutExpired as e:
|
||
raise TimeoutError(f"Node worker timed out after {timeout_s}s") from e
|
||
|
||
stdout = (proc.stdout or "").strip()
|
||
if not stdout:
|
||
raise RuntimeError(f"Node worker produced no output (stderr={proc.stderr!r})")
|
||
|
||
try:
|
||
obj = json.loads(stdout)
|
||
except json.JSONDecodeError as e:
|
||
raise RuntimeError(f"Node worker returned non-JSON output: {stdout[:200]!r}") from e
|
||
|
||
if not obj.get("ok"):
|
||
raise RuntimeError(obj.get("error", "Unknown JS error"))
|
||
|
||
return obj.get("result")
|
||
|
||
def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str:
|
||
if DEBUG:
|
||
print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}")
|
||
|
||
result = self._run_node(
|
||
{"seed": seed, "difficulty": difficulty, "configArray": config_array},
|
||
entry="return __makeSolver(__payload.configArray)._generateAnswerSync(__payload.seed, __payload.difficulty);",
|
||
timeout_s=60,
|
||
)
|
||
|
||
if DEBUG and isinstance(result, str):
|
||
print(f"[JSExecutor] PoW solved: {result[:50]}...")
|
||
|
||
return result
|
||
|
||
def generate_requirements(self, seed: str, config_array: list) -> str:
|
||
result = self._run_node(
|
||
{"seed": seed, "configArray": config_array},
|
||
entry=(
|
||
"const solver = __makeSolver(__payload.configArray);\n"
|
||
"solver.requirementsSeed = __payload.seed;\n"
|
||
"return solver._generateRequirementsTokenAnswerBlocking();"
|
||
),
|
||
timeout_s=30,
|
||
)
|
||
return result
|
||
|
||
def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str:
|
||
if DEBUG:
|
||
print("[JSExecutor] Executing Turnstile VM...")
|
||
|
||
result = self._run_node(
|
||
{"dx": dx_bytecode, "xorKey": xor_key},
|
||
entry=(
|
||
"SentinelSDK.__kt(__payload.xorKey);\n"
|
||
"return await SentinelSDK.__bt(__payload.dx);"
|
||
),
|
||
timeout_s=30,
|
||
)
|
||
|
||
if DEBUG and isinstance(result, str):
|
||
print(f"[JSExecutor] Turnstile result: {result[:50]}...")
|
||
|
||
return result
|