import json import random import string from typing import Optional, Dict, Tuple import time import httpx import re import logging import sys import datetime # --- 配置日志 --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # 如果留空,就是直连(不然容易 429) PROXY_URL = None # PROXY_URL = "http://username:password@ip:port" # --- 随机身份生成器 --- class RandomIdentity: def __init__(self): self.first_names = ["KELLY","RYAN"] self.last_names = ["BURKHART","Pitts"] random_num = random.randint(0, 1) middle_name = "" for i in range(random.randint(5, 10)): middle_name += random.choice(string.ascii_uppercase) self.first_name = self.first_names[random_num] + " " + middle_name self.last_name = self.last_names[random_num] # 生日在这两个里面选择1981-05-14 1985-10-01,使用上面的random_num来选择 self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01" # 随机退伍日期 (最近3年) d_year = random.randint(2025, 2025) d_month = random.randint(1, 12) d_day = random.randint(1, 28) self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}" # --- 随机指纹生成器 --- class RandomFingerprint: def __init__(self): self.chrome_version = random.randint(120, 133) self.build_version = random.randint(0, 5000) self.patch_version = random.randint(0, 150) self.platform = "Windows" if random.random() < 0.8 else "Linux" self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"' if self.platform == "Windows": self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' else: self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' # 生成设备指纹哈希 (32位十六进制字符串) self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32)) def get_headers(self) -> Dict: return { 'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': self.platform_os, 'user-agent': self.ua_string } # --- 辅助函数 --- def extract_verification_link(content: str) -> str | None: """从邮件内容提取 SheerID 验证链接""" if not content: return None # 优先匹配 href 属性中的链接 match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content) if match: return match.group(1).replace('&', '&') # 备用:直接匹配 URL match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content) if match: return match.group(0) return None def extract_email_token(url: str) -> str | None: """从验证链接提取 emailToken""" if not url: return None match = re.search(r'emailToken=(\d+)', url) return match.group(1) if match else None class MailClient: """ 邮件客户端类,用于与临时邮箱 API 交互。 """ def __init__(self, base_url: str = "https://mail.copy.qzz.io/api", auth_token: str = ''): self.base_url = base_url self.headers = {'Authorization': f'Bearer {auth_token}'} self.http_client = httpx.Client(timeout=30.0) def __del__(self): if hasattr(self, "http_client"): self.http_client.close() def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None: """生成一个新的临时邮箱地址""" try: response = self.http_client.get( f"{self.base_url}/generate", headers=self.headers, params={'length': length, 'domainIndex': domain_index} ) if response.status_code != 200: logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}") return None return response.json().get('email') except Exception as e: logger.error(f"生成邮箱请求失败: {e}") return None def get_emails(self, mailbox: str) -> list | None: """获取指定邮箱的邮件列表""" try: response = self.http_client.get( f"{self.base_url}/emails", headers=self.headers, params={'mailbox': mailbox} ) if response.status_code != 200: logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}") return None return response.json() except Exception as e: logger.error(f"获取邮件列表请求失败: {e}") return None def get_email_detail(self, email_id: int) -> str | None: """获取指定邮件的详细内容""" try: response = self.http_client.get( f"{self.base_url}/email/{email_id}", headers=self.headers ) if response.status_code != 200: logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}") return None data = response.json() return data.get('html_content') or data.get('content') except Exception as e: logger.error(f"获取邮件详情请求失败: {e}") return None def delete_mailbox(self, address: str) -> bool: """删除指定的邮箱""" if not address: return False try: response = self.http_client.delete( f"{self.base_url}/mailboxes", headers=self.headers, params={'address': address} ) if response.status_code not in [200, 204]: logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}") return False return True except Exception as e: logger.error(f"删除邮箱请求失败: {e}") return False # --- 主逻辑类 --- class SheerIDVerifier: def __init__(self, verification_id: str, program_id:str) -> None: self.program_id = program_id self.verification_id = verification_id self.fingerprint = RandomFingerprint() self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash self.identity = RandomIdentity() logger.info(f"指纹初始化: Chrome {self.fingerprint.chrome_version}") logger.info(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}") # 配置代理 proxies = None if PROXY_URL: proxies = PROXY_URL logger.info("已启用代理连接") # 修复:httpx 使用 'proxy' 而不是 'proxies' self.http_client = httpx.Client( timeout=30.0, follow_redirects=True, http2=False, proxy=proxies ) def __del__(self): if hasattr(self, "http_client"): self.http_client.close() @staticmethod def parse_verification_id(url: str) -> Optional[str]: match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE) return match.group(1) if match else None @staticmethod def parse_program_id(url: str) -> Optional[str]: match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE) return match.group(1) if match else None def _get_common_headers(self, referer_vid: str = None) -> Dict: vid = referer_vid if referer_vid else self.verification_id base_headers = { 'host': 'services.sheerid.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'en-US,en;q=0.9', 'clientname': 'jslib', 'clientversion': '2.157.0', 'origin': 'https://services.sheerid.com', 'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'priority': 'u=1, i', } base_headers.update(self.fingerprint.get_headers()) return base_headers def verify(self, email_address: str) -> Dict: try: logger.info(f"使用邮箱: {email_address}") time.sleep(random.uniform(1.0, 2.5)) # --- 步骤 0: 握手 --- logger.info(">>> 步骤 0: 初始化会话...") init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}" init_headers = { 'host': 'services.sheerid.com', 'upgrade-insecure-requests': '1', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', } init_headers.update(self.fingerprint.get_headers()) self.http_client.get(init_url, headers=init_headers) time.sleep(random.uniform(0.5, 1.5)) # --- 步骤 1: 提交 Status --- logger.info(">>> 步骤 1: 提交 Status...") step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus" resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers()) if resp1.status_code != 200: raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}") data1 = resp1.json() time.sleep(random.uniform(0.5, 1.5)) # --- 步骤 2: 提交个人信息 --- submissionUrl = data1.get("submissionUrl") current_vid = self.verification_id new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE) if new_vid_match: new_vid = new_vid_match.group(1) if new_vid != self.verification_id: current_vid = new_vid verify_body = { "firstName": self.identity.first_name, "lastName": self.identity.last_name, "birthDate": self.identity.birth_date, "email": email_address, "phoneNumber": "", "organization": {"id": 4070, "name": "Army"}, "dischargeDate": self.identity.discharge_date, 'deviceFingerprintHash': self.device_fingerprint_hash, "locale": "en-US", "country": "US", "metadata": { "marketConsentValue": False, "refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}", "verificationId": current_vid, "flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}", "submissionOptIn": "By submitting..." } } logger.info(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...") resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid)) if resp2.status_code == 429: logger.error("🛑 触发 429 风控! 建议更换 IP 或暂停运行。") raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text) if resp2.status_code != 200: logger.error(f"步骤 2 响应: {resp2.text}") raise Exception(f"步骤 2 失败 ({resp2.status_code})") data2 = resp2.json() if data2.get("currentStep") == "error": raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}") logger.info(f"步骤 2 成功! Current Step: {data2.get('currentStep')}") logger.info(f"\n✅ 验证邮件已发送到: {email_address}") logger.info("请检查邮箱并点击验证链接完成验证!") return { "success": True, "email": email_address, "message": "验证邮件已发送,请检查邮箱" } except Exception as e: logger.error(f"❌ 失败: {e}") return {"success": False, "message": str(e)} # 增加一个验证邮箱的token的步骤 def verify_email_token(self, email_token): try: logger.info(">>> 步骤 3: 验证邮箱 token...") step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop" resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers()) if resp3.status_code != 200: raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}") data3 = resp3.json() time.sleep(random.uniform(0.5, 1.5)) if data3.get("currentStep") == "error": raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}") logger.info(f"步骤 3 成功! Current Step: {data3.get('currentStep')}") return {"success": True, "message": "邮箱 token 验证成功"} except Exception as e: logger.error(f"❌ 失败: {e}") return {"success": False, "message": str(e)} def main(): print("=" * 50) print("SheerID 验证工具") print("=" * 50) # 获取邮箱地址 # email_address = input("请输入您的邮箱地址: ").strip() mail = MailClient() email_address = mail.generate_email() if not email_address: print("错误: 邮箱地址不能为空") return # 获取验证URL if len(sys.argv) > 1: url = sys.argv[1] else: url = input("请输入验证 URL: ").strip() vid = SheerIDVerifier.parse_verification_id(url) pid = SheerIDVerifier.parse_program_id(url) if not vid or not pid: print("URL 解析失败") return print("\n开始验证流程...\n") verifier = SheerIDVerifier(vid, pid) res = verifier.verify(email_address) print("\n" + "="*50) if res['success']: print(f"✅ {res.get('message')}") print(f"邮箱: {res.get('email')}") time.sleep(20) # 轮询等待 SheerID 验证邮件 emails = None while True: emails = mail.get_emails(email_address) if emails and len(emails) > 0 and emails[0].get('sender') == "Verify@SheerID.com": print("收到验证邮件") break print("等待验证邮件...") time.sleep(5) # 获取邮件内容 content = mail.get_email_detail(emails[0].get('id')) verification_link = extract_verification_link(content) # 提取验证链接 email_token = extract_email_token(verification_link) # 从链接中提取 token email_result = verifier.verify_email_token(email_token) # 判断是否成功 if email_result['success']: print("✅ 邮箱 token 验证成功") logger.info(email_result) else: print("❌ 邮箱 token 验证失败") logger.error(email_result) mail.delete_mailbox(email_address) else: print(f"❌ 失败: {res.get('message')}") print("="*50) if __name__ == '__main__': main()