Files
autoPlus/core/sentinel/js_executor.py
2026-01-26 16:25:22 +08:00

258 lines
9.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""JavaScript 执行引擎封装
说明:
- OpenAI 的 `assets/sdk.js` 是浏览器脚本,包含多段 anti-debug 代码与 DOM 初始化逻辑。
- 直接用 PyExecJS/ExecJS 在 Node 环境执行时常见表现是「compile 成功但 call 卡死」。
- 这里改为通过 `node -` 运行一段自包含脚本:先做轻量净化 + browser shim再执行目标函数并强制输出结果。
"""
from __future__ import annotations
import json
import re
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, Optional
from config import load_config
class JSExecutor:
"""通过 Node.js 执行 Sentinel SDK 内部逻辑(支持 async Turnstile VM"""
def __init__(self):
# 加载配置
config = load_config()
self.debug = config.sentinel_debug
# 解析 SDK 路径
sdk_path = Path(config.sentinel_sdk_path)
if not sdk_path.is_absolute():
# 相对路径:相对于项目根目录
project_root = Path(sys.argv[0]).parent.resolve()
sdk_path = project_root / sdk_path
self._sdk_path = sdk_path
self._sdk_code: str = ""
self._load_sdk()
def _load_sdk(self) -> None:
if not self._sdk_path.exists():
raise FileNotFoundError(f"SDK not found at {self._sdk_path}")
sdk_code = self._sdk_path.read_text(encoding="utf-8")
sdk_code = self._sanitize_sdk(sdk_code)
sdk_code = self._inject_internal_exports(sdk_code)
self._sdk_code = sdk_code
if self.debug:
print("[JSExecutor] SDK loaded successfully (sanitized)")
def _sanitize_sdk(self, sdk_code: str) -> str:
"""移除会在 Node 环境中导致卡死/超慢的 anti-debug 片段。"""
# 1) 删除少量已知的顶层 anti-debug 直接调用(独占一行)
sdk_code = re.sub(r"(?m)^\s*[rugkU]\(\);\s*$", "", sdk_code)
sdk_code = re.sub(r"(?m)^\s*o\(\);\s*$", "", sdk_code)
# 2) 删除 `Pt(),` 这种逗号表达式里的 anti-debug 调用(避免语法破坏)
sdk_code = re.sub(r"\bPt\(\),\s*", "", sdk_code)
sdk_code = re.sub(r"\bPt\(\);\s*", "", sdk_code)
# 3) 删除 class 字段初始化里的 anti-debug 调用:`return n(), "" + Math.random();`
sdk_code = re.sub(
r'return\s+n\(\),\s*""\s*\+\s*Math\.random\(\)\s*;',
'return "" + Math.random();',
sdk_code,
)
# 4) 删除类似 `if ((e(), cond))` 的逗号 anti-debug 调用(保留 cond
# 仅处理极短标识符,避免误伤正常逻辑;保留 Turnstile VM 的 `vt()`。
def _strip_comma_call(match: re.Match[str]) -> str:
fn = match.group(1)
if fn == "vt":
return match.group(0)
return "("
sdk_code = re.sub(
r"\(\s*([A-Za-z_$][A-Za-z0-9_$]{0,2})\(\)\s*,",
_strip_comma_call,
sdk_code,
)
return sdk_code
def _inject_internal_exports(self, sdk_code: str) -> str:
"""把 SDK 内部对象导出到 `SentinelSDK` 上,便于在外部调用。"""
# SDK 末尾一般是:
# (t.init = un),
# (t.token = an),
# t
# );
pattern = re.compile(
r"\(\s*t\.init\s*=\s*un\s*\)\s*,\s*\(\s*t\.token\s*=\s*an\s*\)\s*,\s*t\s*\)",
re.MULTILINE,
)
replacement = (
"(t.init = un),"
"(t.token = an),"
"(t.__O = O),"
"(t.__P = P),"
"(t.__bt = bt),"
"(t.__kt = kt),"
"(t.__Kt = Kt),"
"t)"
)
new_code, n = pattern.subn(replacement, sdk_code, count=1)
if n != 1:
raise RuntimeError("Failed to patch SDK exports; SDK format may have changed.")
return new_code
def _node_script(self, payload: Dict[str, Any], entry: str) -> str:
payload_json = json.dumps(payload, ensure_ascii=False)
shim = r"""
// --- minimal browser shims for Node ---
if (typeof globalThis.window !== "object") globalThis.window = globalThis;
if (!window.top) window.top = window;
if (!window.location) window.location = { href: "https://auth.openai.com/create-account/password", search: "", pathname: "/create-account/password", origin: "https://auth.openai.com" };
if (!window.addEventListener) window.addEventListener = function(){};
if (!window.removeEventListener) window.removeEventListener = function(){};
if (!window.postMessage) window.postMessage = function(){};
if (!window.__sentinel_token_pending) window.__sentinel_token_pending = [];
if (!window.__sentinel_init_pending) window.__sentinel_init_pending = [];
if (typeof globalThis.document !== "object") globalThis.document = {};
if (!document.scripts) document.scripts = [];
if (!document.cookie) document.cookie = "";
if (!document.documentElement) document.documentElement = { getAttribute: () => null };
if (!document.currentScript) document.currentScript = null;
if (!document.body) document.body = { appendChild: function(){}, getAttribute: () => null };
if (!document.createElement) document.createElement = function(tag){
return {
tagName: String(tag||"").toUpperCase(),
style: {},
setAttribute: function(){},
getAttribute: function(){ return null; },
addEventListener: function(){},
removeEventListener: function(){},
src: "",
contentWindow: { postMessage: function(){}, addEventListener: function(){}, removeEventListener: function(){} },
};
};
if (typeof globalThis.navigator !== "object") globalThis.navigator = { userAgent: "ua", language: "en-US", languages: ["en-US","en"], hardwareConcurrency: 8 };
if (typeof globalThis.screen !== "object") globalThis.screen = { width: 1920, height: 1080 };
if (typeof globalThis.btoa !== "function") globalThis.btoa = (str) => Buffer.from(str, "binary").toString("base64");
if (typeof globalThis.atob !== "function") globalThis.atob = (b64) => Buffer.from(b64, "base64").toString("binary");
window.btoa = globalThis.btoa;
window.atob = globalThis.atob;
"""
wrapper = f"""
const __payload = {payload_json};
function __makeSolver(configArray) {{
const solver = new SentinelSDK.__O();
solver.sid = configArray?.[14];
// 强制使用 Python 传入的 configArray避免依赖真实浏览器对象
solver.getConfig = () => configArray;
return solver;
}}
async function __entry() {{
{entry}
}}
(async () => {{
try {{
const result = await __entry();
process.stdout.write(JSON.stringify({{ ok: true, result }}), () => process.exit(0));
}} catch (err) {{
const msg = (err && (err.stack || err.message)) ? (err.stack || err.message) : String(err);
process.stdout.write(JSON.stringify({{ ok: false, error: msg }}), () => process.exit(1));
}}
}})();
"""
return "\n".join([shim, self._sdk_code, wrapper])
def _run_node(self, payload: Dict[str, Any], entry: str, timeout_s: int = 30) -> Any:
script = self._node_script(payload, entry)
if self.debug:
print("[JSExecutor] Running Node worker...")
try:
proc = subprocess.run(
["node", "-"],
input=script,
text=True,
capture_output=True,
timeout=timeout_s,
)
except FileNotFoundError as e:
raise RuntimeError("Node.js not found on PATH (required for Sentinel SDK execution).") from e
except subprocess.TimeoutExpired as e:
raise TimeoutError(f"Node worker timed out after {timeout_s}s") from e
stdout = (proc.stdout or "").strip()
if not stdout:
raise RuntimeError(f"Node worker produced no output (stderr={proc.stderr!r})")
try:
obj = json.loads(stdout)
except json.JSONDecodeError as e:
raise RuntimeError(f"Node worker returned non-JSON output: {stdout[:200]!r}") from e
if not obj.get("ok"):
raise RuntimeError(obj.get("error", "Unknown JS error"))
return obj.get("result")
def solve_pow(self, seed: str, difficulty: str, config_array: list) -> str:
if self.debug:
print(f"[JSExecutor] Solving PoW: seed={seed[:10]}..., difficulty={difficulty}")
result = self._run_node(
{"seed": seed, "difficulty": difficulty, "configArray": config_array},
entry="return __makeSolver(__payload.configArray)._generateAnswerSync(__payload.seed, __payload.difficulty);",
timeout_s=60,
)
if self.debug and isinstance(result, str):
print(f"[JSExecutor] PoW solved: {result[:50]}...")
return result
def generate_requirements(self, seed: str, config_array: list) -> str:
result = self._run_node(
{"seed": seed, "configArray": config_array},
entry=(
"const solver = __makeSolver(__payload.configArray);\n"
"solver.requirementsSeed = __payload.seed;\n"
"return solver._generateRequirementsTokenAnswerBlocking();"
),
timeout_s=30,
)
return result
def execute_turnstile(self, dx_bytecode: str, xor_key: str) -> str:
if self.debug:
print("[JSExecutor] Executing Turnstile VM...")
result = self._run_node(
{"dx": dx_bytecode, "xorKey": xor_key},
entry=(
"SentinelSDK.__kt(__payload.xorKey);\n"
"return await SentinelSDK.__bt(__payload.dx);"
),
timeout_s=30,
)
if self.debug and isinstance(result, str):
print(f"[JSExecutor] Turnstile result: {result[:50]}...")
return result