151 lines
5.8 KiB
Python
151 lines
5.8 KiB
Python
import logging
|
||
import re
|
||
from time import sleep
|
||
|
||
import random
|
||
import requests
|
||
# 假设 ProxyRotator 是外部定义的或者是同文件导入的
|
||
from .proxy import ProxyRotator
|
||
# 假设 OLD_ANDROID_USER_AGENTS 是一个包含旧版安卓 UA 的全局列表
|
||
from .constants import OLD_ANDROID_USER_AGENTS
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
class RecaptchaRequestor:
|
||
def __init__(self, timeout, proxies):
|
||
self.timeout = timeout
|
||
# 初始化代理轮换器,这是为了防止IP被封
|
||
self.proxy_rotator = ProxyRotator(proxies)
|
||
self.base_url = 'https://www.google.com/recaptcha'
|
||
|
||
def _random_headers(self):
|
||
# 关键点:伪装成旧版安卓手机
|
||
user_agent = random.choice(OLD_ANDROID_USER_AGENTS)
|
||
return {
|
||
'Content-Type': 'application/x-www-form-urlencoded',
|
||
'User-Agent': user_agent
|
||
}
|
||
|
||
def _request(self, method, endpoint, **kwargs):
|
||
# 构造完整 URL
|
||
url = f"{self.base_url}{endpoint}"
|
||
|
||
# 设置默认超时和 Header
|
||
kwargs.setdefault('timeout', self.timeout)
|
||
kwargs['headers'] = self._random_headers()
|
||
|
||
# 每次请求都切换代理 IP
|
||
kwargs['proxies'] = self.proxy_rotator.get()
|
||
|
||
try:
|
||
# 发送请求
|
||
logger.debug('Recaptcha request %s %s params=%s', method, url, kwargs.get('params'))
|
||
response = requests.request(method, url, **kwargs)
|
||
response.raise_for_status()
|
||
logger.debug('Recaptcha response %s %s status=%s', method, url, response.status_code)
|
||
return response.text
|
||
except requests.RequestException as exc:
|
||
logger.warning('Recaptcha request failed %s %s: %s', method, url, exc)
|
||
return None
|
||
|
||
def fetch_anchor_token(self, api_type, params):
|
||
# 请求具体的 api_type (api2/enterprise) anchor 接口获取初始 Session
|
||
return self._request('GET', f'/{api_type}/anchor', params=params)
|
||
|
||
def fetch_recaptcha_token(self, api_type, s_params, payload):
|
||
# 请求 "/reload" 接口提交答案
|
||
# payload 包含了上一条日志中生成的加密数据
|
||
text = self._request('POST', f'/{api_type}/reload', params={'k': s_params.get('k')}, data=payload)
|
||
|
||
if text:
|
||
# 从返回的 JSON/文本中提取最终的 "rresp" Token
|
||
match = re.search(r'"rresp","(.*?)"', text)
|
||
if match:
|
||
return match.group(1)
|
||
return None
|
||
|
||
|
||
class RecaptchaSolverSync:
|
||
|
||
MAX_RETRIES = 20
|
||
RETRY_DELAY = 1
|
||
|
||
def __init__(self, timeout, proxies):
|
||
# 依赖外部的一个 Requestor 类来发送 HTTP 请求
|
||
self.client = RecaptchaRequestor(timeout=timeout, proxies=proxies)
|
||
|
||
@staticmethod
|
||
def _parse_api_type(anchor_url):
|
||
# 正则分析 URL,判断是标准版 (api2) 还是企业版 (enterprise)
|
||
match = re.search(r'(api2|enterprise)/anchor\?(.*)', anchor_url)
|
||
if match:
|
||
return match.group(1), match.group(2)
|
||
return None, None
|
||
|
||
@staticmethod
|
||
def _extract_c_value(html):
|
||
# 关键步骤:从 Google 返回的 HTML 中提取 "c" 值(Session Token)
|
||
match = re.search(r'value="(.*?)"', html)
|
||
if match:
|
||
return match.group(1)
|
||
return None
|
||
|
||
@staticmethod
|
||
def _parse_params(param_str):
|
||
# 解析 URL 查询字符串成字典,例如 "k=xxx&co=yyy" → {'k': 'xxx', 'co': 'yyy'}
|
||
params = {}
|
||
if param_str:
|
||
for pair in param_str.split('&'):
|
||
if '=' in pair:
|
||
key, value = pair.split('=', 1)
|
||
params[key] = value
|
||
return params
|
||
|
||
def _build_payload(self, s_params, c_value):
|
||
# 构造发送给 Recaptcha 服务器的数据包
|
||
# 包含了版本号(v)、原因(reason=q)、Token(c)、Sitekey(k) 等
|
||
return f"v={s_params.get('v')}&reason=q&c={c_value}&k={s_params.get('k')}&co={s_params.get('co')}"
|
||
|
||
def solve(self, anchor_url):
|
||
api_type, param_str = self._parse_api_type(anchor_url)
|
||
if not param_str:
|
||
logger.error('Invalid anchor URL provided: %s', anchor_url)
|
||
raise ValueError('Invalid anchor URL format.')
|
||
|
||
s_params = self._parse_params(param_str) # 解析参数字典
|
||
|
||
logger.debug('Recaptcha solve start api_type=%s params=%s', api_type, s_params)
|
||
|
||
# 重试循环:尝试最多 20 次
|
||
for attempt in range(1, self.MAX_RETRIES + 1):
|
||
logger.debug('Recaptcha attempt %d/%d', attempt, self.MAX_RETRIES)
|
||
# 1. 请求 Anchor(复选框页面)
|
||
anchor_token_html = self.client.fetch_anchor_token(api_type, s_params)
|
||
if not anchor_token_html:
|
||
logger.debug('Anchor response empty, retrying...')
|
||
|
||
sleep(self.RETRY_DELAY)
|
||
continue
|
||
|
||
# 2. 提取 Session Token ("c" value)
|
||
c_value = self._extract_c_value(anchor_token_html)
|
||
if not c_value:
|
||
logger.debug('Failed to extract c value from anchor response.')
|
||
sleep(self.RETRY_DELAY)
|
||
continue
|
||
|
||
# 3. 构造最终请求载荷
|
||
payload = self._build_payload(s_params, c_value)
|
||
logger.debug('Payload prepared with keys: %s', list(s_params.keys()))
|
||
|
||
# 4. 请求最终的 Pass Token
|
||
token = self.client.fetch_recaptcha_token(api_type, s_params, payload)
|
||
|
||
if token:
|
||
logger.info('Recaptcha solved in %d attempt(s).', attempt)
|
||
return token # 成功拿到 Token!
|
||
|
||
sleep(self.RETRY_DELAY)
|
||
|
||
logger.error('Failed to solve reCAPTCHA after %d attempts.', self.MAX_RETRIES)
|
||
raise RuntimeError('Failed to solve reCAPTCHA after maximum retries.') |