diff --git a/dabing.py b/dabing.py new file mode 100644 index 0000000..212d4c8 --- /dev/null +++ b/dabing.py @@ -0,0 +1,422 @@ +import json +import random +import string +from typing import Optional, Dict, Tuple +import time +import httpx +import re +import logging +import sys +import datetime + +# --- 配置日志 --- +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + + +# 如果留空,就是直连(不然容易 429) +PROXY_URL = None +# PROXY_URL = "http://username:password@ip:port" + +# --- 随机身份生成器 --- +class RandomIdentity: + def __init__(self): + self.first_names = ["KELLY","RYAN"] + self.last_names = ["BURKHART","Pitts"] + + random_num = random.randint(0, 1) + middle_name = "" + for i in range(random.randint(5, 10)): + middle_name += random.choice(string.ascii_uppercase) + self.first_name = self.first_names[random_num] + " " + middle_name + self.last_name = self.last_names[random_num] + + # 生日在这两个里面选择1981-05-14 1985-10-01,使用上面的random_num来选择 + self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01" + + # 随机退伍日期 (最近3年) + d_year = random.randint(2025, 2025) + d_month = random.randint(1, 12) + d_day = random.randint(1, 28) + self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}" + +# --- 随机指纹生成器 --- +class RandomFingerprint: + def __init__(self): + self.chrome_version = random.randint(120, 133) + self.build_version = random.randint(0, 5000) + self.patch_version = random.randint(0, 150) + self.platform = "Windows" if random.random() < 0.8 else "Linux" + self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"' + + if self.platform == "Windows": + self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' + else: + self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' + + # 生成设备指纹哈希 (32位十六进制字符串) + self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32)) + + def get_headers(self) -> Dict: + return { + 'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"', + 'sec-ch-ua-mobile': '?0', + 'sec-ch-ua-platform': self.platform_os, + 'user-agent': self.ua_string + } + +# --- 辅助函数 --- +def extract_verification_link(content: str) -> str | None: + """从邮件内容提取 SheerID 验证链接""" + if not content: + return None + # 优先匹配 href 属性中的链接 + match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content) + if match: + return match.group(1).replace('&', '&') + # 备用:直接匹配 URL + match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content) + if match: + return match.group(0) + return None + + +def extract_email_token(url: str) -> str | None: + """从验证链接提取 emailToken""" + if not url: + return None + match = re.search(r'emailToken=(\d+)', url) + return match.group(1) if match else None + + +class MailClient: + """ + 邮件客户端类,用于与临时邮箱 API 交互。 + """ + + def __init__(self, base_url: str = "https://mail.copy.qzz.io/api", + auth_token: str = ''): + self.base_url = base_url + self.headers = {'Authorization': f'Bearer {auth_token}'} + self.http_client = httpx.Client(timeout=30.0) + + def __del__(self): + if hasattr(self, "http_client"): + self.http_client.close() + + def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None: + """生成一个新的临时邮箱地址""" + try: + response = self.http_client.get( + f"{self.base_url}/generate", + headers=self.headers, + params={'length': length, 'domainIndex': domain_index} + ) + if response.status_code != 200: + logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}") + return None + return response.json().get('email') + except Exception as e: + logger.error(f"生成邮箱请求失败: {e}") + return None + + def get_emails(self, mailbox: str) -> list | None: + """获取指定邮箱的邮件列表""" + try: + response = self.http_client.get( + f"{self.base_url}/emails", + headers=self.headers, + params={'mailbox': mailbox} + ) + if response.status_code != 200: + logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}") + return None + return response.json() + except Exception as e: + logger.error(f"获取邮件列表请求失败: {e}") + return None + + def get_email_detail(self, email_id: int) -> str | None: + """获取指定邮件的详细内容""" + try: + response = self.http_client.get( + f"{self.base_url}/email/{email_id}", + headers=self.headers + ) + if response.status_code != 200: + logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}") + return None + data = response.json() + return data.get('html_content') or data.get('content') + except Exception as e: + logger.error(f"获取邮件详情请求失败: {e}") + return None + + def delete_mailbox(self, address: str) -> bool: + """删除指定的邮箱""" + if not address: + return False + try: + response = self.http_client.delete( + f"{self.base_url}/mailboxes", + headers=self.headers, + params={'address': address} + ) + if response.status_code not in [200, 204]: + logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}") + return False + return True + except Exception as e: + logger.error(f"删除邮箱请求失败: {e}") + return False + + +# --- 主逻辑类 --- +class SheerIDVerifier: + def __init__(self, verification_id: str, program_id:str) -> None: + self.program_id = program_id + self.verification_id = verification_id + self.fingerprint = RandomFingerprint() + self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash + self.identity = RandomIdentity() + + logger.info(f"指纹初始化: Chrome {self.fingerprint.chrome_version}") + logger.info(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}") + + # 配置代理 + proxies = None + if PROXY_URL: + proxies = PROXY_URL + logger.info("已启用代理连接") + + # 修复:httpx 使用 'proxy' 而不是 'proxies' + self.http_client = httpx.Client( + timeout=30.0, + follow_redirects=True, + http2=False, + proxy=proxies + ) + + def __del__(self): + if hasattr(self, "http_client"): + self.http_client.close() + + @staticmethod + def parse_verification_id(url: str) -> Optional[str]: + match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE) + return match.group(1) if match else None + + @staticmethod + def parse_program_id(url: str) -> Optional[str]: + match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE) + return match.group(1) if match else None + + def _get_common_headers(self, referer_vid: str = None) -> Dict: + vid = referer_vid if referer_vid else self.verification_id + base_headers = { + 'host': 'services.sheerid.com', + 'accept': 'application/json, text/plain, */*', + 'accept-language': 'en-US,en;q=0.9', + 'clientname': 'jslib', + 'clientversion': '2.157.0', + 'origin': 'https://services.sheerid.com', + 'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}', + 'sec-fetch-dest': 'empty', + 'sec-fetch-mode': 'cors', + 'sec-fetch-site': 'same-origin', + 'priority': 'u=1, i', + } + base_headers.update(self.fingerprint.get_headers()) + return base_headers + + def verify(self, email_address: str) -> Dict: + try: + logger.info(f"使用邮箱: {email_address}") + time.sleep(random.uniform(1.0, 2.5)) + + # --- 步骤 0: 握手 --- + logger.info(">>> 步骤 0: 初始化会话...") + init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}" + + init_headers = { + 'host': 'services.sheerid.com', + 'upgrade-insecure-requests': '1', + 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', + 'sec-fetch-dest': 'document', + 'sec-fetch-mode': 'navigate', + 'sec-fetch-site': 'none', + 'sec-fetch-user': '?1', + } + init_headers.update(self.fingerprint.get_headers()) + + self.http_client.get(init_url, headers=init_headers) + time.sleep(random.uniform(0.5, 1.5)) + + # --- 步骤 1: 提交 Status --- + logger.info(">>> 步骤 1: 提交 Status...") + step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus" + + resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers()) + if resp1.status_code != 200: + raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}") + + data1 = resp1.json() + time.sleep(random.uniform(0.5, 1.5)) + + # --- 步骤 2: 提交个人信息 --- + submissionUrl = data1.get("submissionUrl") + current_vid = self.verification_id + + new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE) + if new_vid_match: + new_vid = new_vid_match.group(1) + if new_vid != self.verification_id: + current_vid = new_vid + + verify_body = { + "firstName": self.identity.first_name, + "lastName": self.identity.last_name, + "birthDate": self.identity.birth_date, + "email": email_address, + "phoneNumber": "", + "organization": {"id": 4070, "name": "Army"}, + "dischargeDate": self.identity.discharge_date, + 'deviceFingerprintHash': self.device_fingerprint_hash, + "locale": "en-US", + "country": "US", + "metadata": { + "marketConsentValue": False, + "refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}", + "verificationId": current_vid, + "flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}", + "submissionOptIn": "By submitting..." + } + } + + logger.info(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...") + resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid)) + + if resp2.status_code == 429: + logger.error("🛑 触发 429 风控! 建议更换 IP 或暂停运行。") + raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text) + + if resp2.status_code != 200: + logger.error(f"步骤 2 响应: {resp2.text}") + raise Exception(f"步骤 2 失败 ({resp2.status_code})") + + data2 = resp2.json() + if data2.get("currentStep") == "error": + raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}") + + logger.info(f"步骤 2 成功! Current Step: {data2.get('currentStep')}") + logger.info(f"\n✅ 验证邮件已发送到: {email_address}") + logger.info("请检查邮箱并点击验证链接完成验证!") + + + + return { + "success": True, + "email": email_address, + "message": "验证邮件已发送,请检查邮箱" + } + except Exception as e: + logger.error(f"❌ 失败: {e}") + return {"success": False, "message": str(e)} + + # 增加一个验证邮箱的token的步骤 + def verify_email_token(self, email_token): + try: + logger.info(">>> 步骤 3: 验证邮箱 token...") + step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop" + + resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers()) + if resp3.status_code != 200: + raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}") + + data3 = resp3.json() + time.sleep(random.uniform(0.5, 1.5)) + + if data3.get("currentStep") == "error": + raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}") + logger.info(f"步骤 3 成功! Current Step: {data3.get('currentStep')}") + return {"success": True, "message": "邮箱 token 验证成功"} + except Exception as e: + logger.error(f"❌ 失败: {e}") + return {"success": False, "message": str(e)} + +def main(): + print("=" * 50) + print("SheerID 验证工具") + print("=" * 50) + + # 获取邮箱地址 + # email_address = input("请输入您的邮箱地址: ").strip() + mail = MailClient() + + email_address = mail.generate_email() + if not email_address: + print("错误: 邮箱地址不能为空") + return + + # 获取验证URL + if len(sys.argv) > 1: + url = sys.argv[1] + else: + url = input("请输入验证 URL: ").strip() + + vid = SheerIDVerifier.parse_verification_id(url) + pid = SheerIDVerifier.parse_program_id(url) + + if not vid or not pid: + print("URL 解析失败") + return + + print("\n开始验证流程...\n") + verifier = SheerIDVerifier(vid, pid) + res = verifier.verify(email_address) + + print("\n" + "="*50) + if res['success']: + print(f"✅ {res.get('message')}") + print(f"邮箱: {res.get('email')}") + + time.sleep(20) + + # 轮询等待 SheerID 验证邮件 + emails = None + while True: + emails = mail.get_emails(email_address) + if emails and len(emails) > 0 and emails[0].get('sender') == "Verify@SheerID.com": + print("收到验证邮件") + break + print("等待验证邮件...") + time.sleep(5) + + # 获取邮件内容 + content = mail.get_email_detail(emails[0].get('id')) + + verification_link = extract_verification_link(content) # 提取验证链接 + email_token = extract_email_token(verification_link) # 从链接中提取 token + + email_result = verifier.verify_email_token(email_token) + # 判断是否成功 + if email_result['success']: + print("✅ 邮箱 token 验证成功") + logger.info(email_result) + else: + print("❌ 邮箱 token 验证失败") + logger.error(email_result) + + mail.delete_mailbox(email_address) + + else: + print(f"❌ 失败: {res.get('message')}") + print("="*50) + + + +if __name__ == '__main__': + main()