Files
AutoDoneTeam/modules/sentinel_native.py
2026-01-26 22:23:30 +08:00

1108 lines
49 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# modules/sentinel_native.py
"""
纯 Python 实现的 Sentinel 挑战求解器
包含 PoW 求解和 Turnstile VM 字节码解释器
无需依赖 Node.js 或 sdk.js
基于 docs/senti.py 的逆向算法集成
"""
import re
import json
import base64
import time
import random
import uuid
import datetime
from zoneinfo import ZoneInfo
from typing import Optional, Dict, List, Any, Tuple
try:
import esprima
except ImportError:
esprima = None # 如果没有 Turnstile可以不安装
from config import DEBUG, FINGERPRINT_CONFIG
# ==========================================
# Utils
# ==========================================
class Utils:
@staticmethod
def between(main_text: Optional[str], value_1: Optional[str], value_2: Optional[str]) -> str:
"""从文本中提取两个标记之间的内容"""
try:
return main_text.split(value_1)[1].split(value_2)[0]
except Exception:
return ""
# ==========================================
# PoW 挑战求解器
# ==========================================
class PowSolver:
"""纯 Python 实现的 Proof of Work 求解器"""
@staticmethod
def encode(e) -> str:
"""Base64 编码配置数组"""
if isinstance(e, (dict, list)):
e = json.dumps(e, separators=(",", ":"))
encoded = str(e).encode("utf-8")
return base64.b64encode(encoded).decode()
@staticmethod
def mod(e: str) -> str:
"""FNV-1a 哈希变种 + murmur3 混合"""
t = 2166136261
for ch in e:
t ^= ord(ch)
t = (t * 16777619) & 0xFFFFFFFF
t ^= (t >> 16)
t = (t * 2246822507) & 0xFFFFFFFF
t ^= (t >> 13)
t = (t * 3266489909) & 0xFFFFFFFF
t ^= (t >> 16)
return f"{t:08x}"
@staticmethod
def _run_check(t0: int, seed: str, difficulty: str, nonce: int, config: List) -> Optional[str]:
"""单次检查 nonce 是否满足难度要求"""
config[3] = nonce
config[9] = round(time.time() * 1000 - t0)
encoded = PowSolver.encode(config)
if PowSolver.mod(seed + encoded)[:len(difficulty)] <= difficulty:
return f"{encoded}~S"
return None
@staticmethod
def solve(seed: str, difficulty: str, config: List, max_iterations: int = 500000) -> Optional[str]:
"""
解决 PoW 挑战
Args:
seed: 服务器返回的 seed
difficulty: 难度字符串(十六进制前缀)
config: 指纹配置数组
max_iterations: 最大尝试次数
Returns:
解决方案字符串(带 ~S 后缀)或 None
"""
if DEBUG:
print(f"[PowSolver] Solving: seed={seed[:20]}..., difficulty={difficulty}")
t0 = int(time.time() * 1000)
for i in range(max_iterations):
result = PowSolver._run_check(t0, seed, difficulty, i, config.copy())
if result:
if DEBUG:
elapsed = (time.time() * 1000 - t0) / 1000
print(f"[PowSolver] ✓ Solved in {elapsed:.2f}s (nonce={i})")
return "gAAAAAB" + result
# 进度输出
if DEBUG and i > 0 and i % 100000 == 0:
print(f"[PowSolver] Tried {i:,} iterations...")
if DEBUG:
print(f"[PowSolver] ✗ Failed after {max_iterations:,} iterations")
return None
@staticmethod
def generate_requirements_token(config: List) -> str:
"""生成 requirements token (初始化用)"""
t = "e"
n = time.time() * 1000
try:
config[3] = 1
config[9] = round(time.time() * 1000 - n)
return "gAAAAAC" + PowSolver.encode(config)
except Exception as e:
t = PowSolver.encode(str(e))
return "error_" + t
# ==========================================
# VM 字节码反编译器
# ==========================================
class VMParser:
"""AST 解析器 - 用于解析反编译后的代码"""
@staticmethod
def find_var_definition(var_name: str, start_line: int, code: str) -> Optional[str]:
"""在代码中查找变量定义"""
if esprima is None:
return None
code_lines = code.splitlines()
relevant_code = '\n'.join(code_lines[:start_line - 1])
try:
sub_ast = esprima.parseScript(relevant_code, {'loc': True, 'range': True, 'tolerant': True})
except Exception:
return None
var_defs = {}
def collect_var_defs(node, var_defs):
if (node.type == 'VariableDeclarator' and
hasattr(node, 'id') and node.id and
hasattr(node, 'init') and node.init and
hasattr(node, 'loc') and node.loc):
id_name = node.id.name if hasattr(node.id, 'name') else None
if not id_name:
return
abs_line = node.loc.start.line if hasattr(node.loc.start, 'line') else None
if abs_line is None or abs_line >= start_line:
return
if hasattr(node.init, 'range'):
value = relevant_code[node.init.range[0]:node.init.range[1]].strip()
else:
value = str(node.init).strip() if node.init else ''
if id_name not in var_defs:
var_defs[id_name] = []
var_defs[id_name].append({'line': abs_line, 'value': value})
def iterative_traverse(ast, visitor):
if not ast:
return
stack = [ast]
visited = set()
max_stack_size = 10000
while stack:
if len(stack) > max_stack_size:
break
node = stack.pop()
node_id = id(node)
if node_id in visited:
continue
visited.add(node_id)
visitor(node)
for key in reversed(node.__dict__.keys()):
value = getattr(node, key, None)
if isinstance(value, list):
for item in reversed(value):
if isinstance(item, esprima.nodes.Node) and id(item) not in visited:
item._parent = node
stack.append(item)
elif isinstance(value, esprima.nodes.Node) and id(value) not in visited:
value._parent = node
stack.append(value)
iterative_traverse(sub_ast, lambda n: collect_var_defs(n, var_defs))
last_resolved = None
def_line = None
if var_name in var_defs:
var_defs[var_name].sort(key=lambda x: x['line'], reverse=True)
for defn in var_defs[var_name]:
if 'btoa' not in defn['value'] and 'XOR_STR' not in defn['value'] and \
'doubleXOR' not in defn['value'] and 'singlebtoa' not in defn['value']:
last_resolved = defn['value']
def_line = defn['line']
break
if last_resolved:
resolved_vars_cache = {}
def resolve_var_recursive(expr, var_line):
try:
expr_ast = esprima.parseScript(expr, {'loc': True, 'range': True, 'tolerant': True})
except Exception:
return expr
vars_set = set()
def collect_identifiers(node):
if (hasattr(node, 'type') and node.type == 'Identifier' and hasattr(node, 'name')):
parent = getattr(node, '_parent', None)
if parent:
parent_type = parent.type if hasattr(parent, 'type') else None
if ((parent_type == 'MemberExpression' and hasattr(parent, 'property') and parent.property == node and not (hasattr(parent, 'computed') and parent.computed)) or node.name == 'window'):
return
vars_set.add(node.name)
def iterative_traverse_safe(ast, visitor):
if not ast:
return
stack = [ast]
visited = set()
while stack:
node = stack.pop()
node_id = id(node)
if node_id in visited:
continue
visited.add(node_id)
visitor(node)
for key in reversed(node.__dict__.keys()):
value = getattr(node, key, None)
if isinstance(value, list):
for item in reversed(value):
if isinstance(item, esprima.nodes.Node) and id(item) not in visited:
item._parent = node
stack.append(item)
elif isinstance(value, esprima.nodes.Node) and id(value) not in visited:
value._parent = node
stack.append(value)
iterative_traverse_safe(expr_ast, collect_identifiers)
if not vars_set:
return expr
for v in vars_set:
if v in resolved_vars_cache:
continue
def_value = v
if v in var_defs:
for defn in sorted(var_defs[v], key=lambda x: x['line'], reverse=True):
if defn['line'] < var_line and 'btoa' not in defn['value'] and 'XOR_STR' not in defn['value']:
def_value = defn['value']
break
resolved_vars_cache[v] = def_value
resolved_vars_cache[v] = resolve_var_recursive(def_value, var_line)
final_expr = expr
for k, v in resolved_vars_cache.items():
final_expr = re.sub(r'\b' + re.escape(k) + r'\b', str(v), final_expr)
return final_expr
last_resolved = resolve_var_recursive(last_resolved, def_line)
if last_resolved:
escaped_var_name = re.escape(var_name)
double_xor_pattern = re.compile(rf'XOR_STR\s*\(\s*{escaped_var_name}\s*,\s*{escaped_var_name}\s*\)')
xor_matches = double_xor_pattern.findall(code)
if xor_matches and len(xor_matches) >= 2:
last_resolved = f'doublexor({last_resolved})'
else:
usage_line_index = start_line - 1
search_start = max(0, usage_line_index - 10)
relevant_lines = '\n'.join(code_lines[search_start:usage_line_index + 1])
btoa_pattern = re.compile(rf'btoa\s*\(\s*""\s*\+\s*{escaped_var_name}\s*\)')
xor_var_pattern = re.compile(rf'XOR_STR\s*\(\s*{escaped_var_name}\s*,')
btoa_matches = btoa_pattern.findall(relevant_lines)
has_xor_var = bool(xor_var_pattern.search(relevant_lines))
if btoa_matches and len(btoa_matches) == 1 and not has_xor_var:
last_resolved = f'singlebtoa({last_resolved})'
return last_resolved
@staticmethod
def parse_assignments(code: str) -> Dict:
"""解析赋值语句"""
if esprima is None:
return {}
try:
ast = esprima.parseScript(code, loc=True, jsx=True)
except Exception:
return {}
stringify_calls = []
def traverse_node(node):
if isinstance(node, dict):
if node.get('type') == 'CallExpression':
callee = node.get('callee', {})
if (callee.get('type') == 'MemberExpression' and callee.get('object', {}).get('name') == 'JSON' and callee.get('property', {}).get('name') == 'stringify' and node.get('arguments') and node['arguments'][0]['type'] == 'Identifier'):
stringify_calls.append(node['arguments'][0]['name'])
for v in node.values():
traverse_node(v)
elif isinstance(node, list):
for item in node:
traverse_node(item)
traverse_node(ast.toDict())
last_stringify_arg = stringify_calls[-1] if stringify_calls else None
if not last_stringify_arg:
return {}
var_values = {}
def traverse_vars(node):
if isinstance(node, dict):
if node.get('type') == 'VariableDeclarator':
id_node = node.get('id', {})
init_node = node.get('init', {})
if (id_node.get('type') == 'Identifier' and init_node and init_node.get('type') in ('Literal', 'NumericLiteral', 'StringLiteral')):
var_values[id_node['name']] = init_node.get('value')
for v in node.values():
traverse_vars(v)
elif isinstance(node, list):
for item in node:
traverse_vars(item)
traverse_vars(ast.toDict())
assignments = {}
def traverse_assignments(node):
if isinstance(node, dict):
if node.get('type') == 'AssignmentExpression':
left = node.get('left', {})
right = node.get('right', {})
if (left.get('type') == 'MemberExpression' and left.get('object', {}).get('name') == last_stringify_arg and left.get('property', {}).get('type') == 'Identifier' and right.get('type') == 'Identifier' and node.get('loc')):
key_var = left['property']['name']
value = right['name']
key = var_values.get(key_var, key_var)
resolved_value = VMParser.find_var_definition(value, node['loc']['start']['line'], code) or value
assignments[key] = resolved_value
for v in node.values():
traverse_assignments(v)
elif isinstance(node, list):
for item in node:
traverse_assignments(item)
traverse_assignments(ast.toDict())
return assignments
@staticmethod
def get_xor_key(js_code: str) -> Optional[str]:
"""从代码中提取 XOR 密钥"""
if esprima is None:
return None
try:
parsed = esprima.parseScript(js_code, tolerant=True)
except Exception:
return None
last_xor_call = None
second_arg_node = None
for node in parsed.body:
if node.type == 'VariableDeclaration':
for decl in node.declarations:
if decl.init and decl.init.type == 'CallExpression':
call = decl.init
if call.callee.type == 'Identifier' and call.callee.name == 'XOR_STR':
last_xor_call = call
second_arg_node = call.arguments[1]
if not last_xor_call:
return None
if second_arg_node.type == 'Identifier':
var_name = second_arg_node.name
elif second_arg_node.type == 'Literal':
return second_arg_node.value
else:
return None
def find_value(nodes, name):
for node in nodes:
if node.type == 'VariableDeclaration':
for decl in node.declarations:
if decl.id.name == name and decl.init.type == 'Literal':
return decl.init.value
elif node.type == 'ExpressionStatement' and node.expression.type == 'AssignmentExpression':
expr = node.expression
if expr.left.type == 'Identifier' and expr.left.name == name and expr.right.type == 'Literal':
return expr.right.value
return None
return find_value(parsed.body, var_name)
@staticmethod
def parse_keys(decompiled_code: str) -> Tuple[str, Dict]:
"""解析键值对"""
assignments: dict = VMParser.parse_assignments(decompiled_code)
xor_key: str = VMParser.get_xor_key(decompiled_code)
parsed_keys: dict = {}
randomindex = 1
for key, value in assignments.items():
key = str(key)
if value.startswith("Array") and "location" not in value:
try:
numbers = value.split(') : ')[1].split(" + ")
num1 = float(numbers[0])
num2 = float(numbers[1])
parsed_keys[key] = str(float(num1 + num2))
except Exception:
pass
elif "location" in value:
parsed_keys[key] = "location"
elif "cfIpLongitude" in value:
parsed_keys[key] = "ipinfo"
elif "maxTouchPoints" in value:
parsed_keys[key] = "vendor"
elif "history" in value:
parsed_keys[key] = "history"
elif 'window["Object"]["keys"]' in value:
parsed_keys[key] = "localstorage"
elif 'createElement' in value:
parsed_keys[key] = "element"
elif value.isdigit():
parsed_keys[key] = value
elif "random" in value:
parsed_keys[key] = "random_" + str(randomindex)
randomindex += 1
elif "doublexor" in value:
parsed_keys[key] = value
elif "singlebtoa" in value:
parsed_keys[key] = value
return xor_key, parsed_keys
# ==========================================
# VM 字节码反编译器
# ==========================================
class VMDecompiler:
"""Turnstile VM 字节码反编译器"""
mapping: Dict[str, str] = {
"1": "XOR_STR", "2": "SET_VALUE", "3": "BTOA", "4": "BTOA_2", "5": "ADD_OR_PUSH",
"6": "ARRAY_ACCESS", "7": "CALL", "8": "COPY", "10": "window", "11": "GET_SCRIPT_SRC",
"12": "GET_MAP", "13": "TRY_CALL", "14": "JSON_PARSE", "15": "JSON_STRINGIFY", "17": "CALL_AND_SET",
"18": "ATOB", "19": "BTOA_3", "20": "IF_EQUAL_CALL", "21": "IF_DIFF_CALL", "22": "TEMP_STACK_CALL",
"23": "IF_DEFINED_CALL", "24": "BIND_METHOD", "27": "REMOVE_OR_SUBTRACT", "28": "undefined",
"25": "undefined", "26": "undefined", "29": "LESS_THAN", "31": "INCREMENT", "32": "DECREMENT_AND_EXEC",
"33": "MULTIPLY", "34": "MOVE"
}
functions: Dict[str, str] = {
"XOR_STR": """function XOR_STR(e, t) {
e = String(e);
t = String(t);
let n = "";
for (let r = 0; r < e.length; r++)
n += String.fromCharCode(e.charCodeAt(r) ^ t.charCodeAt(r % t.length));
return n;
}
"""
}
def __init__(self):
self.xorkey = ""
self.xorkey2 = ""
self.decompiled = ""
self.array_dict = {}
self.vg = 0
self.round1 = 0
self.found = False
self.potential = []
def _start(self):
"""重置状态"""
self.xorkey = ""
self.xorkey2 = ""
self.decompiled = "var mem = {};\n"
self.array_dict = {}
self.vg = 0
self.round1 = 0
self.found = False
self.potential = []
@staticmethod
def _xor_string(e: str, t: str) -> str:
"""XOR 字符串"""
n = ""
for r in range(len(e)):
n += chr(ord(e[r]) ^ ord(t[r % len(t)]))
return n
def _handle_operation(self, operation: str, args: List[str]):
"""处理单个操作"""
if operation == "COPY":
self.mapping[args[0]] = self.mapping.get(args[1], args[1])
if self.mapping.get(args[1]) != "window":
if self.mapping.get(args[1]) in self.functions and f"function {self.mapping.get(args[1])}" not in self.decompiled:
self.decompiled += self.functions[self.mapping[args[1]]] + "\n"
else:
var_name = str(args[1]).replace(".", "_")
self.decompiled += f"var var_{var_name} = window;\n"
self.array_dict[args[1]] = "window"
elif operation == "SET_VALUE":
var_name = str(args[0]).replace(".", "_")
value = args[1]
try:
num = float(value)
if num.is_integer():
self.decompiled += f"var var_{var_name} = {int(num)};\n"
self.array_dict[args[0]] = str(int(num))
else:
self.decompiled += f"var var_{var_name} = {num};\n"
self.array_dict[args[0]] = str(num)
except (ValueError, TypeError):
if isinstance(value, str):
if value == "[]":
self.decompiled += f"var var_{var_name} = [];\n"
self.array_dict[args[0]] = []
elif value == "None":
self.decompiled += f"var var_{var_name} = null;\n"
self.array_dict[args[0]] = "null"
else:
self.decompiled += f"var var_{var_name} = \"{value}\";\n"
self.array_dict[args[0]] = f"\"{value}\""
elif isinstance(value, list):
self.decompiled += f"var var_{var_name} = [];\n"
self.array_dict[args[0]] = []
elif value is None:
self.decompiled += f"var var_{var_name} = null;\n"
self.array_dict[args[0]] = "null"
else:
self.decompiled += f"var var_{var_name} = {value};\n"
self.array_dict[args[0]] = str(value)
elif operation == "ARRAY_ACCESS":
self._handle_array_access(args)
elif operation == "BIND_METHOD":
self._handle_bind_method(args)
elif operation == "XOR_STR":
if self.round1 == 1 and len(self.potential) < 2:
self.potential.append({"var": args[0], "key": args[1]})
var_name = str(args[0]).replace(".", "_")
key_name = str(args[1]).replace(".", "_")
self.decompiled += f"var var_{var_name} = XOR_STR(var_{var_name}, var_{key_name});\n"
elif operation == "BTOA_3":
var_name = str(args[0]).replace(".", "_")
self.decompiled += f"var var_{var_name} = btoa(\"\" + var_{var_name});\n"
elif operation == "CALL_AND_SET":
var_name = str(args[0]).replace(".", "_")
func_name = str(args[1]).replace(".", "_")
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args[2:])
self.decompiled += f"var var_{var_name} = var_{func_name}({args_str});\n"
elif operation == "IF_DEFINED_CALL":
self._handle_if_defined_call(args)
elif operation == "CALL":
self._handle_call_operation(args)
elif operation == "ADD_OR_PUSH":
var_name = str(args[0]).replace(".", "_")
arg_name = str(args[1]).replace(".", "_")
self.decompiled += (f"var var_{var_name} = Array.isArray(var_{var_name}) ? " f"(var_{var_name}.push(var_{arg_name}), var_{var_name}) : var_{var_name} + var_{arg_name};\n")
elif operation == "IF_DIFF_CALL":
var_0 = str(args[0]).replace(".", "_")
var_1 = str(args[1]).replace(".", "_")
var_2 = str(args[2]).replace(".", "_")
if self.mapping.get(args[3]) == "COPY":
var_4 = str(args[4]).replace(".", "_")
var_5 = str(args[5]).replace(".", "_")
self.decompiled += (f"Math.abs(var_{var_0} - var_{var_1}) > var_{var_2} ? var_{var_4} = var_{var_5} : null;\n")
else:
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args[4:])
self.decompiled += (f"Math.abs(var_{var_0} - var_{var_1}) > var_{var_2} ? {self.mapping.get(args[3], args[3])}({args_str}) : null;\n")
elif operation == "TRY_CALL":
self._handle_try_call(args)
elif operation == "JSON_STRINGIFY":
var_name = str(args[0]).replace(".", "_")
self.decompiled += f"var var_{var_name} = JSON.stringify(var_{var_name});\n"
elif operation == "MOVE":
self.decompiled += f"MOVE {args}"
else:
mapped = [self.mapping.get(key, "") for key in args[1:] if key in self.mapping]
unlabeled = [str(key) for key in args[1:] if key not in self.mapping]
all_values = " ".join(mapped + unlabeled)
self.decompiled += f"// UNKNOWN: {operation} -> {args[0]} {all_values};\n"
def _handle_try_call(self, args: List[str]):
target_var = f"var_{str(args[0]).replace('.', '_')}"
fn = self.mapping.get(args[1], "")
rest_args = [f"var_{str(a).replace('.', '_')}" for a in args[2:]]
if fn == "ARRAY_ACCESS":
self.decompiled += (f"try {{ mem[{rest_args[0]}] = {rest_args[1]}[{rest_args[0]}]; }} catch(r) {{ {target_var} = \"\" + r; }}\n")
else:
args_str = ", ".join(rest_args)
self.decompiled += (f"try {{ {fn}({args_str}); }} catch(r) {{ {target_var} = \"\" + r; }}\n")
def _handle_array_access(self, args: List[str]):
var_0 = str(args[0]).replace(".", "_")
var_1 = str(args[1]).replace(".", "_")
var_2 = str(args[2]).replace(".", "_")
if f"var var_{var_1} =" in self.decompiled:
if args[1] in self.array_dict or args[2] in self.array_dict:
if args[2] in self.array_dict and args[1] not in self.array_dict:
self.decompiled += f"var var_{var_0} = var_{var_1}[{self.array_dict[args[2]]}];\n"
elif args[1] in self.array_dict and args[2] not in self.array_dict:
self.decompiled += f"var var_{var_0} = {self.array_dict[args[1]]}[var_{var_2}];\n"
else:
if re.search(rf"var\s+var_{var_1}\s*=\s*\w+\([^)]*\)", self.decompiled):
self.decompiled += f"var var_{var_0} = var_{var_1}[{self.array_dict[args[2]]}];\n"
self.array_dict[args[0]] = f"var_{var_1}[{self.array_dict[args[2]]}]"
else:
self.decompiled += f"var var_{var_0} = {self.array_dict[args[1]]}[{self.array_dict[args[2]]}];\n"
self.array_dict[args[0]] = f"{self.array_dict[args[1]]}[{self.array_dict[args[2]]}]"
else:
self.decompiled += f"var var_{var_0} = var_{var_1}[var_{var_2}];\n"
else:
self.decompiled += f"var var_{var_0} = window[var_{var_2}];\n"
def _handle_bind_method(self, args: List[str]):
var_0 = str(args[0]).replace(".", "_")
var_1 = str(args[1]).replace(".", "_")
var_2 = str(args[2]).replace(".", "_")
if f"var var_{var_1} =" in self.decompiled:
if args[1] in self.array_dict or args[2] in self.array_dict:
if args[1] in self.array_dict and args[2] not in self.array_dict:
self.decompiled += (f"var var_{var_0} = {self.array_dict[args[1]]}[var_{var_2}].bind({self.array_dict[args[1]]});\n")
else:
if re.search(rf"var\s+var_{var_1}\s*=\s*\w+\([^)]*\)", self.decompiled):
self.decompiled += (f"var var_{var_0} = var_{var_1}[{self.array_dict[args[2]]}].bind(var_{var_1});\n")
self.array_dict[args[0]] = f"var_{var_1}[{self.array_dict[args[2]]}]"
else:
self.decompiled += (f"var var_{var_0} = {self.array_dict[args[1]]}[{self.array_dict[args[2]]}].bind({self.array_dict[args[1]]});\n")
self.array_dict[args[0]] = f"{self.array_dict[args[1]]}[{self.array_dict[args[2]]}]"
else:
self.decompiled += (f"var var_{var_0} = var_{var_1}[var_{var_2}].bind(var_{var_1});\n")
else:
self.decompiled += (f"var var_{var_0} = window[var_{var_2}].bind(var_{var_1});\n")
def _handle_if_defined_call(self, args: List[str]):
result = []
for item in args:
if item in self.mapping:
keys = [k for k, v in self.mapping.items() if v == self.mapping[item] and k != item]
result.append(keys[0] if keys else None)
else:
result.append(None)
result = [None if key is None else ([k for k, v in self.mapping.items() if v == self.mapping.get(key) and k != key] or [None])[0] for key in result]
if len(args) == 4:
target = str(args[3]).replace(".", "_")
count = len(re.findall(target, self.decompiled))
if count <= 1 and f"var var_{str(args[2]).replace('.', '_')}" not in self.decompiled:
if not self.xorkey:
self.xorkey = str(args[3])
var_0 = str(args[0]).replace(".", "_")
if self.mapping.get(result[1]) == "SET_VALUE":
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? (mem[\"{args[2]}\"] = \"{args[3]}\", var_{var_0}) : var_{var_0};\n")
else:
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ({self.mapping.get(result[1], '')}(\"{args[2]}\", \"{args[3]}\") || var_{var_0}) : var_{var_0};\n")
elif count <= 3:
var_0 = str(args[0]).replace(".", "_")
arg_2 = str(args[2]).replace(".", "_")
if self.mapping.get(result[1]) == "SET_VALUE":
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ((mem[\"{args[2]}\"] = \"{args[3]}\") || var_{var_0}) : var_{var_0};\n")
else:
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ({self.mapping.get(result[1], '')}(var_{arg_2}, mem[\"{args[3]}\"]) || var_{var_0}) : var_{var_0};\n")
elif self.mapping.get(result[1]) == "JSON_PARSE":
var_0 = str(args[0]).replace(".", "_")
arg_3 = str(args[3]).replace(".", "_")
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? (JSON.parse(var_{arg_3}) || var_{var_0}) : var_{var_0};\n")
else:
var_0 = str(args[0]).replace(".", "_")
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args[2:])
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ({self.mapping.get(result[1], '')}({args_str}) || var_{var_0}) : var_{var_0};\n")
else:
var_0 = str(args[0]).replace(".", "_")
if len(args) > 4 and f"mem[\"{args[4]}\"] =" in self.decompiled:
args_str = ", ".join(f"mem[\"{arg}\"]" if i + 2 == 3 else f"var_{str(arg).replace('.', '_')}" for i, arg in enumerate(args[2:]))
if self.mapping.get(result[1]) == "CALL":
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? (var_{str(args[2]).replace('.', '_')}({args_str}) || var_{var_0}) : var_{var_0};\n")
else:
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ({self.mapping.get(result[1], '')}({args_str}) || var_{var_0}) : var_{var_0};\n")
else:
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args[2:])
if self.mapping.get(result[1]) == "ATOB":
arg_2 = str(args[2]).replace(".", "_")
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? (atob(\"\" + var_{arg_2}) || var_{var_0}) : var_{var_0};\n")
elif len(args) >= 3 and result[1] in self.mapping:
self.decompiled += (f"var var_{var_0} = var_{var_0} !== void 0 ? ({self.mapping.get(result[1], '')}({args_str}) || var_{var_0}) : var_{var_0};\n")
else:
self.decompiled += f"// ERROR: Invalid IF_DEFINED_CALL with args {args};\n"
def _handle_call_operation(self, args: List[str]):
if args[0] in self.mapping:
if self.mapping[args[0]] == "BTOA":
arg_1 = str(args[1]).replace(".", "_")
self.decompiled += f"console.log(btoa(\"\" + var_{arg_1}));\n"
else:
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args)
self.decompiled += f"{self.mapping[args[0]]}({args_str});\n"
else:
if f"var var_{str(args[0]).replace('.', '_')} = \"set\";" in self.decompiled:
arg_1 = str(args[1]).replace(".", "_")
arg_2 = str(args[2]).replace(".", "_")
arg_3 = str(args[3]).replace(".", "_")
self.decompiled += f"var_{arg_1}[var_{arg_2}] = var_{arg_3};\n"
else:
args_str = ", ".join(f"var_{arg.replace('.', '_')}" for arg in args[1:])
self.decompiled += f"var_{str(args[0]).replace('.', '_')}({args_str});\n"
def _remove_unused_variables(self):
"""移除未使用的变量"""
lines = self.decompiled.split("\n")
used_vars = set()
var_decl_lines = []
for i, line in enumerate(lines):
match = re.match(r"^var\s+var_([\w_]+)\s*=", line)
if match:
var_decl_lines.append({"name": match.group(1), "index": i})
for var in var_decl_lines:
name = var["name"]
is_used = any(name in line and not line.startswith(f"var var_{name} =") for line in lines)
if is_used:
used_vars.add(name)
self.decompiled = "\n".join(
line for line in lines
if not re.match(r"^var\s+var_([\w_]+)\s*=", line) or re.match(r"^var\s+var_([\w_]+)\s*=", line).group(1) in used_vars
)
def _decompile_bytecode(self, bytecode: List):
"""反编译字节码"""
while len(bytecode) > 0:
e = str(bytecode[0][0])
t = [str(item) for item in bytecode[0][1:]]
bytecode.pop(0)
self.vg += 1
if e in self.mapping:
self._handle_operation(self.mapping[e], t)
else:
self.decompiled += f"// UNKNOWN_OPCODE {e} -> {', '.join(t)};\n"
if self.mapping.get(e) == "CALL" and not self.found:
for entry in self.potential:
if len(t) > 3 and entry["var"] == t[3]:
key_str = str(entry["key"]).replace(".", "_")
regex = rf"var var_{key_str} = (.*);"
match = re.search(regex, self.decompiled)
if match:
self.xorkey2 = match.group(1).replace(";", "")
self.found = True
break
if self.round1 == 0:
self.round1 += 1
self._decompile_round2()
def _decompile_round2(self):
"""第二轮反编译"""
matches = [m.group(2) for m in re.finditer(r"var\s+\w+\s*=\s*(['\"`])([\s\S]*?)\1", self.decompiled)]
bytecode = max(matches, key=len, default="")
if bytecode:
decoded = json.loads(self._xor_string(base64.b64decode(bytecode).decode(), str(self.xorkey)))
self._decompile_bytecode(decoded)
if self.round1 == 1:
self.round1 += 1
self._decompile_round3()
def _decompile_round3(self):
"""第三轮反编译"""
matches = [m.group(2) for m in re.finditer(r"var\s+\w+\s*=\s*(['\"`])([\s\S]*?)\1", self.decompiled)]
bytecode = next((s for s in matches if 60 <= len(s) <= 200), "")
if bytecode:
decoded = json.loads(self._xor_string(base64.b64decode(bytecode).decode(), str(self.xorkey)))
self._decompile_bytecode(decoded)
self._remove_unused_variables()
def decompile(self, turnstile: str, token: str) -> str:
"""
反编译 Turnstile VM 字节码
Args:
turnstile: Base64 编码的字节码
token: XOR 密钥
Returns:
反编译后的 JavaScript 代码
"""
self._start()
self.decompiled = (
"const { JSDOM } = require(\"jsdom\");\n"
"const dom = new JSDOM(\"<!DOCTYPE html><p>Hello world</p>\", { url: \"https://chatgpt.com/\" });\n"
"const window = dom.window;\n"
"var mem = {};\n"
)
self._decompile_bytecode(json.loads(self._xor_string(base64.b64decode(turnstile).decode(), str(token))))
return self.decompiled
# ==========================================
# Turnstile Token 生成器
# ==========================================
class TurnstileSolver:
"""Turnstile VM 求解器"""
html_object: str = json.dumps(
{"x":0,"y":1219,"width":37.8125,"height":30,"top":1219,"right":37.8125,"bottom":1249,"left":0},
separators=(',', ':')
)
@staticmethod
def xor(e: str, t: str) -> str:
"""XOR 两个字符串"""
t = str(t)
e = str(e)
n = ""
for r in range(len(e)):
n += chr(ord(e[r]) ^ ord(t[r % len(t)]))
return n
@staticmethod
def solve(turnstile_bytecode: str, p_token: str, ip_info: str) -> str:
"""
解决 Turnstile 挑战
Args:
turnstile_bytecode: dx 字节码
p_token: requirements token
ip_info: IP 信息字符串
Returns:
Turnstile token
"""
if esprima is None:
raise ImportError("esprima is required for Turnstile solving. Install with: pip install esprima")
if DEBUG:
print("[TurnstileSolver] Decompiling VM bytecode...")
decompiler = VMDecompiler()
decompiled: str = decompiler.decompile(turnstile_bytecode, p_token)
xor_key, keys = VMParser.parse_keys(decompiled)
if DEBUG:
print(f"[TurnstileSolver] Found {len(keys)} keys, xor_key={xor_key[:20] if xor_key else 'None'}...")
payload: dict = {}
for key, value in keys.items():
try:
value = float(value)
except Exception:
pass
if isinstance(value, float):
payload[key] = base64.b64encode(TurnstileSolver.xor(str(value), xor_key).encode("utf-8")).decode("utf-8")
elif "singlebtoa" in str(value):
payload[key] = base64.b64encode(value.split("singlebtoa(")[1].split(")")[0].encode("utf-8")).decode("utf-8")
elif "doublexor" in str(value):
number: str = value.split("doublexor(")[1].split(")")[0]
value_1: str = base64.b64encode(TurnstileSolver.xor(number, number).encode("utf-8")).decode("utf-8")
value_2: str = base64.b64encode(TurnstileSolver.xor(value_1, value_1).encode("utf-8")).decode("utf-8")
payload[key] = base64.b64encode(value_2.encode("utf-8")).decode("utf-8")
elif "ipinfo" in str(value):
payload[key] = base64.b64encode(TurnstileSolver.xor(ip_info, xor_key).encode("utf-8")).decode("utf-8")
elif "element" in str(value):
payload[key] = base64.b64encode(TurnstileSolver.xor(TurnstileSolver.html_object, xor_key).encode()).decode()
elif "location" in str(value):
location: str = 'https://chatgpt.com/'
payload[key] = base64.b64encode(TurnstileSolver.xor(location, xor_key).encode("utf-8")).decode("utf-8")
elif "random_1" in str(value):
random_value: float = random.random()
payload[key] = base64.b64encode(TurnstileSolver.xor(str(random_value), str(random_value)).encode("utf-8")).decode("utf-8")
elif "random_2" in str(value):
payload[key] = random.random()
elif "vendor" in str(value):
vendor_info: str = '["Google Inc.","Win32",8,0]'
payload[key] = base64.b64encode(TurnstileSolver.xor(vendor_info, xor_key).encode("utf-8")).decode("utf-8")
elif "localstorage" in str(value):
storage_keys = 'oai/apps/hasDismissedTeamsNoAuthUpsell,oai/apps/lastSeenNoAuthTrialsBannerAt,oai-did,oai/apps/noAuthGoUpsellModalDismissed,oai/apps/hasDismissedBusinessFreeTrialUpsellModal,oai/apps/capExpiresAt,statsig.session_id.1792610830,oai/apps/hasSeenNoAuthImagegenNux,oai/apps/lastPageLoadDate,client-correlated-secret,statsig.stable_id.1792610830,oai/apps/debugSettings,oai/apps/hasDismissedPlusFreeTrialUpsellModal,oai/apps/tatertotInContextUpsellBannerV2,search.attributions-settings'
payload[key] = base64.b64encode(TurnstileSolver.xor(storage_keys, xor_key).encode("utf-8")).decode("utf-8")
elif "history" in str(value):
payload[key] = base64.b64encode(TurnstileSolver.xor(str(random.randint(1, 5)), xor_key).encode()).decode()
turnstile_token: str = base64.b64encode(
TurnstileSolver.xor(json.dumps(payload, separators=(',', ':')), xor_key).encode("utf-8")
).decode("utf-8")
if DEBUG:
print(f"[TurnstileSolver] ✓ Generated token: {turnstile_token[:50]}...")
return turnstile_token
# ==========================================
# 配置数组生成器
# ==========================================
class ConfigGenerator:
"""生成 Sentinel 需要的配置数组"""
@staticmethod
def create_config(
prod_version: str,
device_id: str,
ip_info: Optional[List[str]] = None,
user_agent: Optional[str] = None
) -> List:
"""
创建配置数组
Args:
prod_version: 产品版本号
device_id: 设备 ID
ip_info: IP 信息列表 [ip, city, region, lat, lng, timezone]
user_agent: 用户代理
Returns:
18 元素配置数组
"""
if user_agent is None:
user_agent = FINGERPRINT_CONFIG['user_agent']
if ip_info is None:
ip_info = ["0.0.0.0", "Unknown", "Unknown", "0", "0", "UTC"]
# 时区处理
try:
tz_info = ZoneInfo(ip_info[5])
except Exception:
tz_info = datetime.timezone.utc
# 生成随机 React 字符串
def rand_str():
n = random.random()
base36 = ''
chars = '0123456789abcdefghijklmnopqrstuvwxyz'
x = int(n * 36**10)
for _ in range(10):
x, r = divmod(x, 36)
base36 = chars[r] + base36
return base36
reacts = ["location", "__reactContainer$" + rand_str(), "_reactListening" + rand_str()]
window_keys = [
"0", "window", "self", "document", "name", "location", "customElements",
"history", "navigation", "locationbar", "menubar", "personalbar",
"scrollbars", "statusbar", "toolbar", "status", "closed", "frames",
"length", "top", "opener", "parent", "frameElement", "navigator",
"origin", "external", "screen", "innerWidth", "innerHeight", "scrollX",
"pageXOffset", "scrollY", "pageYOffset", "visualViewport", "screenX",
"screenY", "outerWidth", "outerHeight", "devicePixelRatio", "event",
"clientInformation", "screenLeft", "screenTop", "styleMedia", "onsearch",
"trustedTypes", "performance", "onappinstalled", "onbeforeinstallprompt",
"crypto", "indexedDB", "sessionStorage", "localStorage", "chrome",
"__oai_SSR_HTML", "__reactRouterContext", "$RC", "__oai_SSR_TTI",
"__reactRouterManifest", "__reactRouterVersion", "DD_RUM",
"__REACT_INTL_CONTEXT__", "regeneratorRuntime", "DD_LOGS", "__STATSIG__",
"__mobxInstanceCount", "__mobxGlobals", "_g", "__reactRouterRouteModules",
"__SEGMENT_INSPECTOR__", "__reactRouterDataRouter", "MotionIsMounted",
"_oaiHandleSessionExpired"
]
start_time = int(time.time() * 1000)
return [
FINGERPRINT_CONFIG['screen_width'] + FINGERPRINT_CONFIG['screen_height'], # [0] screen dimensions
datetime.datetime.now(tz_info).strftime(f"%a %b %d %Y %H:%M:%S GMT%z ({datetime.datetime.now(tz_info).tzname()})"), # [1] timestamp
4294705152, # [2] memory
random.random(), # [3] nonce (placeholder, will be updated)
user_agent, # [4] user agent
None, # [5] script src
prod_version, # [6] build ID
"en-US", # [7] language
"en-US,en", # [8] languages
random.random(), # [9] elapsed time (placeholder)
"webkitGetUserMediafunction webkitGetUserMedia() { [native code] }", # [10] navigator property
random.choice(reacts), # [11] react key
random.choice(window_keys), # [12] window key
random.randint(800, 1400) + random.random(), # [13] performance.now()
device_id, # [14] session UUID
"", # [15] URL params
FINGERPRINT_CONFIG['hardware_concurrency'], # [16] hardware concurrency
start_time # [17] timeOrigin
]
# ==========================================
# 统一接口
# ==========================================
class NativeSentinelSolver:
"""
纯 Python Sentinel 求解器的统一接口
替代需要 Node.js 的 JSExecutor + SentinelSolver
"""
def __init__(self, device_id: str = None):
self.device_id = device_id or str(uuid.uuid4())
self.ip_info = None
self.prod_version = None
self._config = None
def set_context(self, prod_version: str, ip_info: List[str] = None):
"""设置上下文信息"""
self.prod_version = prod_version
self.ip_info = ip_info
self._config = ConfigGenerator.create_config(
prod_version=prod_version,
device_id=self.device_id,
ip_info=ip_info
)
def get_config(self) -> List:
"""获取当前配置数组"""
if self._config is None:
raise ValueError("Please call set_context() first")
return self._config.copy()
def generate_requirements_token(self) -> str:
"""生成 requirements token"""
config = self.get_config()
return PowSolver.generate_requirements_token(config)
def solve_pow(self, seed: str, difficulty: str) -> str:
"""解决 PoW 挑战"""
config = self.get_config()
result = PowSolver.solve(seed, difficulty, config)
if result is None:
raise RuntimeError("Failed to solve PoW challenge")
return result
def solve_turnstile(self, dx_bytecode: str, p_token: str) -> str:
"""解决 Turnstile 挑战"""
ip_str = str(self.ip_info[:-1]) if self.ip_info else "[]"
return TurnstileSolver.solve(dx_bytecode, p_token, ip_str)
def solve_enforcement(self, enforcement_config: Dict, p_token: str = None) -> Dict:
"""
解决完整的 enforcement 挑战
Args:
enforcement_config: 服务器返回的挑战配置
p_token: requirements token (用于 Turnstile)
Returns:
{'proof': 'gAAAAAB...', 'turnstile': '...'}
"""
pow_data = enforcement_config.get('proofofwork', {})
# 1. 解决 PoW
seed = pow_data.get('seed')
difficulty = pow_data.get('difficulty')
if not seed or not difficulty:
raise ValueError("Missing seed or difficulty in enforcement config")
proof_token = self.solve_pow(seed, difficulty)
result = {
'proof': proof_token,
'turnstile': None
}
# 2. 解决 Turnstile (如果有)
turnstile_data = pow_data.get('turnstile')
if turnstile_data and turnstile_data.get('dx') and p_token:
result['turnstile'] = self.solve_turnstile(turnstile_data['dx'], p_token)
return result
# ==========================================
# 便捷函数
# ==========================================
def solve_pow_simple(seed: str, difficulty: str, device_id: str = None) -> str:
"""
简单的 PoW 求解
Args:
seed: 挑战 seed
difficulty: 难度
device_id: 设备 ID
Returns:
PoW token
"""
solver = NativeSentinelSolver(device_id)
solver.set_context(prod_version="unknown", ip_info=None)
return solver.solve_pow(seed, difficulty)
def generate_requirements_token_simple(device_id: str = None, prod_version: str = "unknown") -> str:
"""
简单生成 requirements token
Args:
device_id: 设备 ID
prod_version: 产品版本
Returns:
Requirements token
"""
solver = NativeSentinelSolver(device_id)
solver.set_context(prod_version=prod_version)
return solver.generate_requirements_token()