加入了bot,加入了代理池适配
This commit is contained in:
dela
2026-01-07 16:42:47 +08:00

422
dabing.py Normal file
View File

@@ -0,0 +1,422 @@
import json
import random
import string
from typing import Optional, Dict, Tuple
import time
import httpx
import re
import logging
import sys
import datetime
# --- 配置日志 ---
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# 如果留空,就是直连(不然容易 429
PROXY_URL = None
# PROXY_URL = "http://username:password@ip:port"
# --- 随机身份生成器 ---
class RandomIdentity:
def __init__(self):
self.first_names = ["KELLY","RYAN"]
self.last_names = ["BURKHART","Pitts"]
random_num = random.randint(0, 1)
middle_name = ""
for i in range(random.randint(5, 10)):
middle_name += random.choice(string.ascii_uppercase)
self.first_name = self.first_names[random_num] + " " + middle_name
self.last_name = self.last_names[random_num]
# 生日在这两个里面选择1981-05-14 1985-10-01使用上面的random_num来选择
self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01"
# 随机退伍日期 (最近3年)
d_year = random.randint(2025, 2025)
d_month = random.randint(1, 12)
d_day = random.randint(1, 28)
self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}"
# --- 随机指纹生成器 ---
class RandomFingerprint:
def __init__(self):
self.chrome_version = random.randint(120, 133)
self.build_version = random.randint(0, 5000)
self.patch_version = random.randint(0, 150)
self.platform = "Windows" if random.random() < 0.8 else "Linux"
self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"'
if self.platform == "Windows":
self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
else:
self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
# 生成设备指纹哈希 (32位十六进制字符串)
self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32))
def get_headers(self) -> Dict:
return {
'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': self.platform_os,
'user-agent': self.ua_string
}
# --- 辅助函数 ---
def extract_verification_link(content: str) -> str | None:
"""从邮件内容提取 SheerID 验证链接"""
if not content:
return None
# 优先匹配 href 属性中的链接
match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content)
if match:
return match.group(1).replace('&amp;', '&')
# 备用:直接匹配 URL
match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content)
if match:
return match.group(0)
return None
def extract_email_token(url: str) -> str | None:
"""从验证链接提取 emailToken"""
if not url:
return None
match = re.search(r'emailToken=(\d+)', url)
return match.group(1) if match else None
class MailClient:
"""
邮件客户端类,用于与临时邮箱 API 交互。
"""
def __init__(self, base_url: str = "https://mail.copy.qzz.io/api",
auth_token: str = ''):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {auth_token}'}
self.http_client = httpx.Client(timeout=30.0)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None:
"""生成一个新的临时邮箱地址"""
try:
response = self.http_client.get(
f"{self.base_url}/generate",
headers=self.headers,
params={'length': length, 'domainIndex': domain_index}
)
if response.status_code != 200:
logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}")
return None
return response.json().get('email')
except Exception as e:
logger.error(f"生成邮箱请求失败: {e}")
return None
def get_emails(self, mailbox: str) -> list | None:
"""获取指定邮箱的邮件列表"""
try:
response = self.http_client.get(
f"{self.base_url}/emails",
headers=self.headers,
params={'mailbox': mailbox}
)
if response.status_code != 200:
logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}")
return None
return response.json()
except Exception as e:
logger.error(f"获取邮件列表请求失败: {e}")
return None
def get_email_detail(self, email_id: int) -> str | None:
"""获取指定邮件的详细内容"""
try:
response = self.http_client.get(
f"{self.base_url}/email/{email_id}",
headers=self.headers
)
if response.status_code != 200:
logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}")
return None
data = response.json()
return data.get('html_content') or data.get('content')
except Exception as e:
logger.error(f"获取邮件详情请求失败: {e}")
return None
def delete_mailbox(self, address: str) -> bool:
"""删除指定的邮箱"""
if not address:
return False
try:
response = self.http_client.delete(
f"{self.base_url}/mailboxes",
headers=self.headers,
params={'address': address}
)
if response.status_code not in [200, 204]:
logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}")
return False
return True
except Exception as e:
logger.error(f"删除邮箱请求失败: {e}")
return False
# --- 主逻辑类 ---
class SheerIDVerifier:
def __init__(self, verification_id: str, program_id:str) -> None:
self.program_id = program_id
self.verification_id = verification_id
self.fingerprint = RandomFingerprint()
self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash
self.identity = RandomIdentity()
logger.info(f"指纹初始化: Chrome {self.fingerprint.chrome_version}")
logger.info(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}")
# 配置代理
proxies = None
if PROXY_URL:
proxies = PROXY_URL
logger.info("已启用代理连接")
# 修复httpx 使用 'proxy' 而不是 'proxies'
self.http_client = httpx.Client(
timeout=30.0,
follow_redirects=True,
http2=False,
proxy=proxies
)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
@staticmethod
def parse_verification_id(url: str) -> Optional[str]:
match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE)
return match.group(1) if match else None
@staticmethod
def parse_program_id(url: str) -> Optional[str]:
match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE)
return match.group(1) if match else None
def _get_common_headers(self, referer_vid: str = None) -> Dict:
vid = referer_vid if referer_vid else self.verification_id
base_headers = {
'host': 'services.sheerid.com',
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9',
'clientname': 'jslib',
'clientversion': '2.157.0',
'origin': 'https://services.sheerid.com',
'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}',
'sec-fetch-dest': 'empty',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'priority': 'u=1, i',
}
base_headers.update(self.fingerprint.get_headers())
return base_headers
def verify(self, email_address: str) -> Dict:
try:
logger.info(f"使用邮箱: {email_address}")
time.sleep(random.uniform(1.0, 2.5))
# --- 步骤 0: 握手 ---
logger.info(">>> 步骤 0: 初始化会话...")
init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}"
init_headers = {
'host': 'services.sheerid.com',
'upgrade-insecure-requests': '1',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
}
init_headers.update(self.fingerprint.get_headers())
self.http_client.get(init_url, headers=init_headers)
time.sleep(random.uniform(0.5, 1.5))
# --- 步骤 1: 提交 Status ---
logger.info(">>> 步骤 1: 提交 Status...")
step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus"
resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers())
if resp1.status_code != 200:
raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}")
data1 = resp1.json()
time.sleep(random.uniform(0.5, 1.5))
# --- 步骤 2: 提交个人信息 ---
submissionUrl = data1.get("submissionUrl")
current_vid = self.verification_id
new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE)
if new_vid_match:
new_vid = new_vid_match.group(1)
if new_vid != self.verification_id:
current_vid = new_vid
verify_body = {
"firstName": self.identity.first_name,
"lastName": self.identity.last_name,
"birthDate": self.identity.birth_date,
"email": email_address,
"phoneNumber": "",
"organization": {"id": 4070, "name": "Army"},
"dischargeDate": self.identity.discharge_date,
'deviceFingerprintHash': self.device_fingerprint_hash,
"locale": "en-US",
"country": "US",
"metadata": {
"marketConsentValue": False,
"refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}",
"verificationId": current_vid,
"flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}",
"submissionOptIn": "By submitting..."
}
}
logger.info(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...")
resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid))
if resp2.status_code == 429:
logger.error("🛑 触发 429 风控! 建议更换 IP 或暂停运行。")
raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text)
if resp2.status_code != 200:
logger.error(f"步骤 2 响应: {resp2.text}")
raise Exception(f"步骤 2 失败 ({resp2.status_code})")
data2 = resp2.json()
if data2.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}")
logger.info(f"步骤 2 成功! Current Step: {data2.get('currentStep')}")
logger.info(f"\n✅ 验证邮件已发送到: {email_address}")
logger.info("请检查邮箱并点击验证链接完成验证!")
return {
"success": True,
"email": email_address,
"message": "验证邮件已发送,请检查邮箱"
}
except Exception as e:
logger.error(f"❌ 失败: {e}")
return {"success": False, "message": str(e)}
# 增加一个验证邮箱的token的步骤
def verify_email_token(self, email_token):
try:
logger.info(">>> 步骤 3: 验证邮箱 token...")
step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop"
resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers())
if resp3.status_code != 200:
raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}")
data3 = resp3.json()
time.sleep(random.uniform(0.5, 1.5))
if data3.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}")
logger.info(f"步骤 3 成功! Current Step: {data3.get('currentStep')}")
return {"success": True, "message": "邮箱 token 验证成功"}
except Exception as e:
logger.error(f"❌ 失败: {e}")
return {"success": False, "message": str(e)}
def main():
print("=" * 50)
print("SheerID 验证工具")
print("=" * 50)
# 获取邮箱地址
# email_address = input("请输入您的邮箱地址: ").strip()
mail = MailClient()
email_address = mail.generate_email()
if not email_address:
print("错误: 邮箱地址不能为空")
return
# 获取验证URL
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("请输入验证 URL: ").strip()
vid = SheerIDVerifier.parse_verification_id(url)
pid = SheerIDVerifier.parse_program_id(url)
if not vid or not pid:
print("URL 解析失败")
return
print("\n开始验证流程...\n")
verifier = SheerIDVerifier(vid, pid)
res = verifier.verify(email_address)
print("\n" + "="*50)
if res['success']:
print(f"{res.get('message')}")
print(f"邮箱: {res.get('email')}")
time.sleep(20)
# 轮询等待 SheerID 验证邮件
emails = None
while True:
emails = mail.get_emails(email_address)
if emails and len(emails) > 0 and emails[0].get('sender') == "Verify@SheerID.com":
print("收到验证邮件")
break
print("等待验证邮件...")
time.sleep(5)
# 获取邮件内容
content = mail.get_email_detail(emails[0].get('id'))
verification_link = extract_verification_link(content) # 提取验证链接
email_token = extract_email_token(verification_link) # 从链接中提取 token
email_result = verifier.verify_email_token(email_token)
# 判断是否成功
if email_result['success']:
print("✅ 邮箱 token 验证成功")
logger.info(email_result)
else:
print("❌ 邮箱 token 验证失败")
logger.error(email_result)
mail.delete_mailbox(email_address)
else:
print(f"❌ 失败: {res.get('message')}")
print("="*50)
if __name__ == '__main__':
main()