Files
gptVeteran/autoDabing.py
2026-01-07 16:40:28 +08:00

594 lines
24 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
import random
import string
from typing import Optional, Dict, Tuple
import time
import httpx
import re
import logging
import sys
import datetime
import os
# --- 配置日志 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- 代理池配置 ---
PROXY_POOL_ENABLED = os.getenv("PROXY_POOL_ENABLED", "1").strip().lower() not in {"0", "false", "no", "off"} # 是否启用代理池
PROXY_POOL_API = os.getenv("PROXY_POOL_API", "https://getproxy.mygoband.com/v1/proxies/next").strip()
PROXY_POOL_TOKEN = os.getenv("PROXY_POOL_TOKEN", "").strip() # 你的 API key不需要带 Bearer 前缀)
PROXY_POOL_AUTH = f"Bearer {PROXY_POOL_TOKEN}" if PROXY_POOL_TOKEN else "" # Authorization: Bearer <token>
# 静态代理(当代理池不可用/关闭时作为兜底);留空则直连
PROXY_URL = os.getenv("PROXY_URL", "").strip() or None
# 例: http://username:password@ip:port
# --- 代理池管理器 ---
class ProxyPool:
"""
代理池管理器,从代理服务端点获取随机代理
"""
def __init__(self, api_url: str, auth_token: str):
self.api_url = api_url
self.auth_token = auth_token
self.current_proxy = None
def get_proxy(self) -> str | None:
"""从代理池API获取一个新的代理"""
try:
# 解析URL
if not self.auth_token:
logger.error("获取代理失败: 未设置 PROXY_POOL_TOKENAuthorization 为空)")
return None
headers = {'Authorization': self.auth_token}
# 使用 httpx 发起请求
with httpx.Client(timeout=10.0) as client:
res = client.get(self.api_url, headers=headers)
if res.status_code != 200:
logger.error(f"获取代理失败 ({res.status_code}): {res.text}")
return None
# 解析响应
response_data = res.json()
# 提取代理信息
# 响应格式: {"proxy": {"protocol": "http", "host": "x.x.x.x", "port": 443, "username": "user", "password": "pass"}, "lease_id": "...", "ttl_ms": 59994}
if 'proxy' in response_data and isinstance(response_data['proxy'], dict):
proxy_info = response_data['proxy']
protocol = proxy_info.get('protocol', 'http')
proxy_host = proxy_info.get('host')
proxy_port = proxy_info.get('port')
username = proxy_info.get('username')
password = proxy_info.get('password')
# 强制使用 http 协议,不使用 https
protocol = 'http'
if proxy_host and proxy_port:
# 如果有用户名和密码,添加到代理 URL 中
if username and password:
proxy = f"{protocol}://{username}:{password}@{proxy_host}:{proxy_port}"
logger.info(f"代理需要认证: {username}@{proxy_host}:{proxy_port}")
else:
proxy = f"{protocol}://{proxy_host}:{proxy_port}"
else:
logger.error(f"代理信息不完整: {proxy_info}")
return None
elif 'proxy' in response_data and isinstance(response_data['proxy'], str):
# 如果是字符串格式(可能已经包含认证信息)
proxy = response_data['proxy']
# 强制替换 https 为 http
if proxy.startswith('https://'):
proxy = proxy.replace('https://', 'http://', 1)
elif 'host' in response_data:
# 兼容旧格式
proxy_host = response_data.get('host')
proxy_port = response_data.get('port')
username = response_data.get('username', '')
password = response_data.get('password', '')
if username and password:
proxy = f"http://{username}:{password}@{proxy_host}:{proxy_port}"
else:
proxy = f"http://{proxy_host}:{proxy_port}"
else:
# 如果是纯文本格式的代理地址
proxy = res.text.strip()
# 强制替换 https 为 http
if proxy.startswith('https://'):
proxy = proxy.replace('https://', 'http://', 1)
self.current_proxy = proxy
logger.info(f"获取到新代理: {proxy}")
return proxy
except Exception as e:
logger.error(f"获取代理请求失败: {e}")
return None
def refresh_proxy(self) -> str | None:
"""刷新代理(获取新的代理)"""
return self.get_proxy()
# --- 随机身份生成器 ---
class RandomIdentity:
def __init__(self):
self.first_names = ["KELLY","RYAN"]
self.last_names = ["BURKHART","Pitts"]
random_num = random.randint(0, 1)
middle_name = ""
for i in range(random.randint(5, 10)):
middle_name += random.choice(string.ascii_uppercase)
self.first_name = self.first_names[random_num] + " " + middle_name
self.last_name = self.last_names[random_num]
# 生日在这两个里面选择1981-05-14 1985-10-01使用上面的random_num来选择
self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01"
# 随机退伍日期 (最近3年)
d_year = random.randint(2025, 2025)
d_month = random.randint(1, 12)
d_day = random.randint(1, 28)
self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}"
# --- 随机指纹生成器 ---
class RandomFingerprint:
def __init__(self):
self.chrome_version = random.randint(120, 133)
self.build_version = random.randint(0, 5000)
self.patch_version = random.randint(0, 150)
self.platform = "Windows" if random.random() < 0.8 else "Linux"
self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"'
if self.platform == "Windows":
self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
else:
self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
# 生成设备指纹哈希 (32位十六进制字符串)
self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32))
def get_headers(self) -> Dict:
return {
'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': self.platform_os,
'user-agent': self.ua_string
}
# --- 辅助函数 ---
def extract_verification_link(content: str) -> str | None:
"""从邮件内容提取 SheerID 验证链接"""
if not content:
return None
# 优先匹配 href 属性中的链接
match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content)
if match:
return match.group(1).replace('&amp;', '&')
# 备用:直接匹配 URL
match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content)
if match:
return match.group(0)
return None
def extract_email_token(url: str) -> str | None:
"""从验证链接提取 emailToken"""
if not url:
return None
match = re.search(r'emailToken=(\d+)', url)
return match.group(1) if match else None
class MailClient:
"""
邮件客户端类,用于与临时邮箱 API 交互。
邮箱操作使用直连,不使用代理
"""
def __init__(self, base_url: str = "https://mail.copy.qzz.io/api",
auth_token: str = 'ks4cPQfAvMQI1iqhFCDJTGpuc5ez6B95Iaf9SzA47MP0LiH5Pv7urHjjvVsHJlCY'):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {auth_token}'}
# 邮箱操作使用直连,不使用代理
self.http_client = httpx.Client(timeout=30.0)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None:
"""生成一个新的临时邮箱地址"""
try:
response = self.http_client.get(
f"{self.base_url}/generate",
headers=self.headers,
params={'length': length, 'domainIndex': domain_index}
)
if response.status_code != 200:
logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}")
return None
return response.json().get('email')
except Exception as e:
logger.error(f"生成邮箱请求失败: {e}")
return None
def get_emails(self, mailbox: str) -> list | None:
"""获取指定邮箱的邮件列表"""
try:
response = self.http_client.get(
f"{self.base_url}/emails",
headers=self.headers,
params={'mailbox': mailbox}
)
if response.status_code != 200:
logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}")
return None
return response.json()
except Exception as e:
logger.error(f"获取邮件列表请求失败: {e}")
return None
def get_email_detail(self, email_id: int) -> str | None:
"""获取指定邮件的详细内容"""
try:
response = self.http_client.get(
f"{self.base_url}/email/{email_id}",
headers=self.headers
)
if response.status_code != 200:
logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}")
return None
data = response.json()
return data.get('html_content') or data.get('content')
except Exception as e:
logger.error(f"获取邮件详情请求失败: {e}")
return None
def delete_mailbox(self, address: str) -> bool:
"""删除指定的邮箱"""
if not address:
return False
try:
response = self.http_client.delete(
f"{self.base_url}/mailboxes",
headers=self.headers,
params={'address': address}
)
if response.status_code not in [200, 204]:
logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}")
return False
return True
except Exception as e:
logger.error(f"删除邮箱请求失败: {e}")
return False
# --- 主逻辑类 ---
class SheerIDVerifier:
def __init__(self, verification_id: str, program_id: str, proxy_pool: ProxyPool = None, log_callback=None) -> None:
self.program_id = program_id
self.verification_id = verification_id
self.fingerprint = RandomFingerprint()
self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash
self.identity = RandomIdentity()
self.proxy_pool = proxy_pool
self.log_callback = log_callback
self.log(f"指纹初始化: Chrome {self.fingerprint.chrome_version}")
self.log(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}")
# 配置代理 - 获取一个代理并在整个验证过程中使用同一个代理
self.proxy_url = None
if self.proxy_pool:
self.proxy_url = self.proxy_pool.get_proxy()
if self.proxy_url:
self.log(f"已启用代理池连接: {self.proxy_url}")
else:
logger.warning("代理池获取失败,使用静态代理或直连")
if not self.proxy_url and PROXY_URL:
self.proxy_url = PROXY_URL
self.log("已启用静态代理连接")
# 修复httpx 使用 'proxy' 而不是 'proxies'
self.http_client = httpx.Client(
timeout=30.0,
follow_redirects=True,
http2=False,
proxy=self.proxy_url
)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
def log(self, msg):
logger.info(msg)
if self.log_callback:
try:
self.log_callback(msg)
except Exception:
pass
@staticmethod
def parse_verification_id(url: str) -> Optional[str]:
match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE)
return match.group(1) if match else None
@staticmethod
def parse_program_id(url: str) -> Optional[str]:
match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE)
return match.group(1) if match else None
def _get_common_headers(self, referer_vid: str = None) -> Dict:
vid = referer_vid if referer_vid else self.verification_id
base_headers = {
'host': 'services.sheerid.com',
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9',
'clientname': 'jslib',
'clientversion': '2.157.0',
'origin': 'https://services.sheerid.com',
'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'priority': 'u=1, i',
}
base_headers.update(self.fingerprint.get_headers())
return base_headers
def verify(self, email_address: str) -> Dict:
try:
self.log(f"使用邮箱: {email_address}")
time.sleep(random.uniform(1.0, 2.5))
# --- 步骤 0: 握手 ---
self.log(">>> 步骤 0: 初始化会话...")
init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}"
init_headers = {
'host': 'services.sheerid.com',
'upgrade-insecure-requests': '1',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
}
init_headers.update(self.fingerprint.get_headers())
self.http_client.get(init_url, headers=init_headers)
time.sleep(random.uniform(0.5, 1.5))
# --- 步骤 1: 提交 Status ---
self.log(">>> 步骤 1: 提交 Status...")
step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus"
resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers())
if resp1.status_code != 200:
raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}")
data1 = resp1.json()
time.sleep(random.uniform(0.5, 1.5))
# --- 步骤 2: 提交个人信息 ---
submissionUrl = data1.get("submissionUrl")
current_vid = self.verification_id
new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE)
if new_vid_match:
new_vid = new_vid_match.group(1)
if new_vid != self.verification_id:
current_vid = new_vid
verify_body = {
"firstName": self.identity.first_name,
"lastName": self.identity.last_name,
"birthDate": self.identity.birth_date,
"email": email_address,
"phoneNumber": "",
"organization": {"id": 4070, "name": "Army"},
"dischargeDate": self.identity.discharge_date,
'deviceFingerprintHash': self.device_fingerprint_hash,
"locale": "en-US",
"country": "US",
"metadata": {
"marketConsentValue": False,
"refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}",
"verificationId": current_vid,
"flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}",
"submissionOptIn": "By submitting..."
}
}
self.log(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...")
resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid))
if resp2.status_code == 429:
self.log("🛑 触发 429 风控! 建议更换 IP 或暂停运行。")
raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text)
if resp2.status_code != 200:
self.log(f"步骤 2 响应: {resp2.text}")
raise Exception(f"步骤 2 失败 ({resp2.status_code})")
data2 = resp2.json()
if data2.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}")
self.log(f"步骤 2 成功! Current Step: {data2.get('currentStep')}")
self.log(f"\n✅ 验证邮件已发送到: {email_address}")
self.log("请检查邮箱并点击验证链接完成验证!")
return {
"success": True,
"email": email_address,
"message": "验证邮件已发送,请检查邮箱"
}
except Exception as e:
logger.error(f"❌ 失败: {e}")
# 删除邮箱
if hasattr(self, 'mail'):
self.mail.delete_mailbox(email_address)
self.log(f"邮箱 {email_address} 已删除")
return {"success": False, "message": str(e)}
# 增加一个验证邮箱的token的步骤
def verify_email_token(self, email_token):
try:
self.log(">>> 步骤 3: 验证邮箱 token...")
step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop"
resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers())
if resp3.status_code != 200:
raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}")
data3 = resp3.json()
time.sleep(random.uniform(0.5, 1.5))
if data3.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}")
self.log(f"步骤 3 成功! Current Step: {data3.get('currentStep')}")
return {"success": True, "message": "邮箱 token 验证成功"}
except Exception as e:
logger.error(f"❌ 失败: {e}")
return {"success": False, "message": str(e)}
def process_verification(verification_url: str, log_callback=None) -> Dict:
"""
执行完整的验证流程
"""
def log(msg):
logger.info(msg)
if log_callback:
try:
log_callback(msg)
except Exception:
pass
try:
# 初始化代理池
proxy_pool = None
if PROXY_POOL_ENABLED:
log("启用代理池模式")
if not PROXY_POOL_AUTH:
log("⚠️ 代理池已启用但未设置 PROXY_POOL_TOKEN自动跳过代理池改用静态代理或直连")
else:
proxy_pool = ProxyPool(PROXY_POOL_API, PROXY_POOL_AUTH)
# 获取邮箱地址
mail = MailClient()
email_address = mail.generate_email()
if not email_address:
return {"success": False, "message": "错误: 邮箱地址生成失败"}
# 解析 URL
vid = SheerIDVerifier.parse_verification_id(verification_url)
pid = SheerIDVerifier.parse_program_id(verification_url)
if not vid or not pid:
return {"success": False, "message": "URL 解析失败"}
log("\n开始验证流程...\n")
verifier = SheerIDVerifier(vid, pid, proxy_pool=proxy_pool, log_callback=log_callback)
res = verifier.verify(email_address)
if res['success']:
log(f"{res.get('message')}")
log(f"邮箱: {res.get('email')}")
# 轮询等待 SheerID 验证邮件
emails = None
max_retries = 20 # 最大重试次数 (约 100 秒)
retry_count = 0
while retry_count < max_retries:
time.sleep(5)
retry_count += 1
log(f"等待验证邮件... ({retry_count}/{max_retries})")
emails = mail.get_emails(email_address)
if emails and len(emails) > 0:
# 检查是否有来自 SheerID 的邮件
sheerid_email = next((e for e in emails if e.get('sender') == "Verify@SheerID.com"), None)
if sheerid_email:
log("收到验证邮件")
emails = [sheerid_email] # 只处理这封邮件
break
if not emails or len(emails) == 0:
mail.delete_mailbox(email_address)
return {"success": False, "message": "超时未收到验证邮件"}
# 获取邮件内容
content = mail.get_email_detail(emails[0].get('id'))
verification_link = extract_verification_link(content)
email_token = extract_email_token(verification_link)
if not email_token:
mail.delete_mailbox(email_address)
return {"success": False, "message": "无法提取 email token"}
# 验证邮箱 token
email_result = verifier.verify_email_token(email_token)
mail.delete_mailbox(email_address)
if email_result['success']:
log("✅ 邮箱 token 验证成功")
return {"success": True, "message": "验证流程全部完成", "email": email_address, "details": email_result}
else:
log("❌ 邮箱 token 验证失败")
return {"success": False, "message": "邮箱 token 验证失败", "details": email_result}
else:
return {"success": False, "message": res.get('message')}
except Exception as e:
logger.error(f"验证过程发生异常: {e}")
return {"success": False, "message": str(e)}
def main():
print("=" * 50)
print("SheerID 验证工具")
print("=" * 50)
# 获取验证URL
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("请输入验证 URL: ").strip()
if not url:
print("URL 不能为空")
return
result = process_verification(url)
print("\n" + "="*50)
print(f"最终结果: {'成功' if result['success'] else '失败'}")
print(result.get('message'))
print("="*50)
if __name__ == '__main__':
main()