import json import random import string from typing import Optional, Dict, Tuple import time import httpx import re import logging import sys import datetime import os # --- 配置日志 --- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # --- 代理池配置 --- PROXY_POOL_ENABLED = os.getenv("PROXY_POOL_ENABLED", "1").strip().lower() not in {"0", "false", "no", "off"} # 是否启用代理池 PROXY_POOL_API = os.getenv("PROXY_POOL_API", "https://getproxy.mygoband.com/v1/proxies/next").strip() PROXY_POOL_TOKEN = os.getenv("PROXY_POOL_TOKEN", "").strip() # 你的 API key(不需要带 Bearer 前缀) PROXY_POOL_AUTH = f"Bearer {PROXY_POOL_TOKEN}" if PROXY_POOL_TOKEN else "" # Authorization: Bearer # 静态代理(当代理池不可用/关闭时作为兜底);留空则直连 PROXY_URL = os.getenv("PROXY_URL", "").strip() or None # 例: http://username:password@ip:port # --- 代理池管理器 --- class ProxyPool: """ 代理池管理器,从代理服务端点获取随机代理 """ def __init__(self, api_url: str, auth_token: str): self.api_url = api_url self.auth_token = auth_token self.current_proxy = None def get_proxy(self) -> str | None: """从代理池API获取一个新的代理""" try: # 解析URL if not self.auth_token: logger.error("获取代理失败: 未设置 PROXY_POOL_TOKEN(Authorization 为空)") return None headers = {'Authorization': self.auth_token} # 使用 httpx 发起请求 with httpx.Client(timeout=10.0) as client: res = client.get(self.api_url, headers=headers) if res.status_code != 200: logger.error(f"获取代理失败 ({res.status_code}): {res.text}") return None # 解析响应 response_data = res.json() # 提取代理信息 # 响应格式: {"proxy": {"protocol": "http", "host": "x.x.x.x", "port": 443, "username": "user", "password": "pass"}, "lease_id": "...", "ttl_ms": 59994} if 'proxy' in response_data and isinstance(response_data['proxy'], dict): proxy_info = response_data['proxy'] protocol = proxy_info.get('protocol', 'http') proxy_host = proxy_info.get('host') proxy_port = proxy_info.get('port') username = proxy_info.get('username') password = proxy_info.get('password') # 强制使用 http 协议,不使用 https protocol = 'http' if proxy_host and proxy_port: # 如果有用户名和密码,添加到代理 URL 中 if username and password: proxy = f"{protocol}://{username}:{password}@{proxy_host}:{proxy_port}" logger.info(f"代理需要认证: {username}@{proxy_host}:{proxy_port}") else: proxy = f"{protocol}://{proxy_host}:{proxy_port}" else: logger.error(f"代理信息不完整: {proxy_info}") return None elif 'proxy' in response_data and isinstance(response_data['proxy'], str): # 如果是字符串格式(可能已经包含认证信息) proxy = response_data['proxy'] # 强制替换 https 为 http if proxy.startswith('https://'): proxy = proxy.replace('https://', 'http://', 1) elif 'host' in response_data: # 兼容旧格式 proxy_host = response_data.get('host') proxy_port = response_data.get('port') username = response_data.get('username', '') password = response_data.get('password', '') if username and password: proxy = f"http://{username}:{password}@{proxy_host}:{proxy_port}" else: proxy = f"http://{proxy_host}:{proxy_port}" else: # 如果是纯文本格式的代理地址 proxy = res.text.strip() # 强制替换 https 为 http if proxy.startswith('https://'): proxy = proxy.replace('https://', 'http://', 1) self.current_proxy = proxy logger.info(f"获取到新代理: {proxy}") return proxy except Exception as e: logger.error(f"获取代理请求失败: {e}") return None def refresh_proxy(self) -> str | None: """刷新代理(获取新的代理)""" return self.get_proxy() # --- 随机身份生成器 --- class RandomIdentity: def __init__(self): self.first_names = ["KELLY","RYAN"] self.last_names = ["BURKHART","Pitts"] random_num = random.randint(0, 1) middle_name = "" for i in range(random.randint(5, 10)): middle_name += random.choice(string.ascii_uppercase) self.first_name = self.first_names[random_num] + " " + middle_name self.last_name = self.last_names[random_num] # 生日在这两个里面选择1981-05-14 1985-10-01,使用上面的random_num来选择 self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01" # 随机退伍日期 (最近3年) d_year = random.randint(2025, 2025) d_month = random.randint(1, 12) d_day = random.randint(1, 28) self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}" # --- 随机指纹生成器 --- class RandomFingerprint: def __init__(self): self.chrome_version = random.randint(120, 133) self.build_version = random.randint(0, 5000) self.patch_version = random.randint(0, 150) self.platform = "Windows" if random.random() < 0.8 else "Linux" self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"' if self.platform == "Windows": self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' else: self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36' # 生成设备指纹哈希 (32位十六进制字符串) self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32)) def get_headers(self) -> Dict: return { 'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': self.platform_os, 'user-agent': self.ua_string } # --- 辅助函数 --- def extract_verification_link(content: str) -> str | None: """从邮件内容提取 SheerID 验证链接""" if not content: return None # 优先匹配 href 属性中的链接 match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content) if match: return match.group(1).replace('&', '&') # 备用:直接匹配 URL match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content) if match: return match.group(0) return None def extract_email_token(url: str) -> str | None: """从验证链接提取 emailToken""" if not url: return None match = re.search(r'emailToken=(\d+)', url) return match.group(1) if match else None class MailClient: """ 邮件客户端类,用于与临时邮箱 API 交互。 邮箱操作使用直连,不使用代理 """ def __init__(self, base_url: str = "https://mail.copy.qzz.io/api", auth_token: str = 'ks4cPQfAvMQI1iqhFCDJTGpuc5ez6B95Iaf9SzA47MP0LiH5Pv7urHjjvVsHJlCY'): self.base_url = base_url self.headers = {'Authorization': f'Bearer {auth_token}'} # 邮箱操作使用直连,不使用代理 self.http_client = httpx.Client(timeout=30.0) def __del__(self): if hasattr(self, "http_client"): self.http_client.close() def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None: """生成一个新的临时邮箱地址""" try: response = self.http_client.get( f"{self.base_url}/generate", headers=self.headers, params={'length': length, 'domainIndex': domain_index} ) if response.status_code != 200: logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}") return None return response.json().get('email') except Exception as e: logger.error(f"生成邮箱请求失败: {e}") return None def get_emails(self, mailbox: str) -> list | None: """获取指定邮箱的邮件列表""" try: response = self.http_client.get( f"{self.base_url}/emails", headers=self.headers, params={'mailbox': mailbox} ) if response.status_code != 200: logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}") return None return response.json() except Exception as e: logger.error(f"获取邮件列表请求失败: {e}") return None def get_email_detail(self, email_id: int) -> str | None: """获取指定邮件的详细内容""" try: response = self.http_client.get( f"{self.base_url}/email/{email_id}", headers=self.headers ) if response.status_code != 200: logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}") return None data = response.json() return data.get('html_content') or data.get('content') except Exception as e: logger.error(f"获取邮件详情请求失败: {e}") return None def delete_mailbox(self, address: str) -> bool: """删除指定的邮箱""" if not address: return False try: response = self.http_client.delete( f"{self.base_url}/mailboxes", headers=self.headers, params={'address': address} ) if response.status_code not in [200, 204]: logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}") return False return True except Exception as e: logger.error(f"删除邮箱请求失败: {e}") return False # --- 主逻辑类 --- class SheerIDVerifier: def __init__(self, verification_id: str, program_id: str, proxy_pool: ProxyPool = None, log_callback=None) -> None: self.program_id = program_id self.verification_id = verification_id self.fingerprint = RandomFingerprint() self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash self.identity = RandomIdentity() self.proxy_pool = proxy_pool self.log_callback = log_callback self.log(f"指纹初始化: Chrome {self.fingerprint.chrome_version}") self.log(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}") # 配置代理 - 获取一个代理并在整个验证过程中使用同一个代理 self.proxy_url = None if self.proxy_pool: self.proxy_url = self.proxy_pool.get_proxy() if self.proxy_url: self.log(f"已启用代理池连接: {self.proxy_url}") else: logger.warning("代理池获取失败,使用静态代理或直连") if not self.proxy_url and PROXY_URL: self.proxy_url = PROXY_URL self.log("已启用静态代理连接") # 修复:httpx 使用 'proxy' 而不是 'proxies' self.http_client = httpx.Client( timeout=30.0, follow_redirects=True, http2=False, proxy=self.proxy_url ) def __del__(self): if hasattr(self, "http_client"): self.http_client.close() def log(self, msg): logger.info(msg) if self.log_callback: try: self.log_callback(msg) except Exception: pass @staticmethod def parse_verification_id(url: str) -> Optional[str]: match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE) return match.group(1) if match else None @staticmethod def parse_program_id(url: str) -> Optional[str]: match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE) return match.group(1) if match else None def _get_common_headers(self, referer_vid: str = None) -> Dict: vid = referer_vid if referer_vid else self.verification_id base_headers = { 'host': 'services.sheerid.com', 'accept': 'application/json, text/plain, */*', 'accept-language': 'en-US,en;q=0.9', 'clientname': 'jslib', 'clientversion': '2.157.0', 'origin': 'https://services.sheerid.com', 'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'priority': 'u=1, i', } base_headers.update(self.fingerprint.get_headers()) return base_headers def verify(self, email_address: str) -> Dict: try: self.log(f"使用邮箱: {email_address}") time.sleep(random.uniform(1.0, 2.5)) # --- 步骤 0: 握手 --- self.log(">>> 步骤 0: 初始化会话...") init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}" init_headers = { 'host': 'services.sheerid.com', 'upgrade-insecure-requests': '1', 'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8', 'sec-fetch-dest': 'document', 'sec-fetch-mode': 'navigate', 'sec-fetch-site': 'none', 'sec-fetch-user': '?1', } init_headers.update(self.fingerprint.get_headers()) self.http_client.get(init_url, headers=init_headers) time.sleep(random.uniform(0.5, 1.5)) # --- 步骤 1: 提交 Status --- self.log(">>> 步骤 1: 提交 Status...") step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus" resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers()) if resp1.status_code != 200: raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}") data1 = resp1.json() time.sleep(random.uniform(0.5, 1.5)) # --- 步骤 2: 提交个人信息 --- submissionUrl = data1.get("submissionUrl") current_vid = self.verification_id new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE) if new_vid_match: new_vid = new_vid_match.group(1) if new_vid != self.verification_id: current_vid = new_vid verify_body = { "firstName": self.identity.first_name, "lastName": self.identity.last_name, "birthDate": self.identity.birth_date, "email": email_address, "phoneNumber": "", "organization": {"id": 4070, "name": "Army"}, "dischargeDate": self.identity.discharge_date, 'deviceFingerprintHash': self.device_fingerprint_hash, "locale": "en-US", "country": "US", "metadata": { "marketConsentValue": False, "refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}", "verificationId": current_vid, "flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}", "submissionOptIn": "By submitting..." } } self.log(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...") resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid)) if resp2.status_code == 429: self.log("🛑 触发 429 风控! 建议更换 IP 或暂停运行。") raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text) if resp2.status_code != 200: self.log(f"步骤 2 响应: {resp2.text}") raise Exception(f"步骤 2 失败 ({resp2.status_code})") data2 = resp2.json() if data2.get("currentStep") == "error": raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}") self.log(f"步骤 2 成功! Current Step: {data2.get('currentStep')}") self.log(f"\n✅ 验证邮件已发送到: {email_address}") self.log("请检查邮箱并点击验证链接完成验证!") return { "success": True, "email": email_address, "message": "验证邮件已发送,请检查邮箱" } except Exception as e: logger.error(f"❌ 失败: {e}") # 删除邮箱 if hasattr(self, 'mail'): self.mail.delete_mailbox(email_address) self.log(f"邮箱 {email_address} 已删除") return {"success": False, "message": str(e)} # 增加一个验证邮箱的token的步骤 def verify_email_token(self, email_token): try: self.log(">>> 步骤 3: 验证邮箱 token...") step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop" resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers()) if resp3.status_code != 200: raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}") data3 = resp3.json() time.sleep(random.uniform(0.5, 1.5)) if data3.get("currentStep") == "error": raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}") self.log(f"步骤 3 成功! Current Step: {data3.get('currentStep')}") return {"success": True, "message": "邮箱 token 验证成功"} except Exception as e: logger.error(f"❌ 失败: {e}") return {"success": False, "message": str(e)} def process_verification(verification_url: str, log_callback=None) -> Dict: """ 执行完整的验证流程 """ def log(msg): logger.info(msg) if log_callback: try: log_callback(msg) except Exception: pass try: # 初始化代理池 proxy_pool = None if PROXY_POOL_ENABLED: log("启用代理池模式") if not PROXY_POOL_AUTH: log("⚠️ 代理池已启用但未设置 PROXY_POOL_TOKEN,自动跳过代理池(改用静态代理或直连)") else: proxy_pool = ProxyPool(PROXY_POOL_API, PROXY_POOL_AUTH) # 获取邮箱地址 mail = MailClient() email_address = mail.generate_email() if not email_address: return {"success": False, "message": "错误: 邮箱地址生成失败"} # 解析 URL vid = SheerIDVerifier.parse_verification_id(verification_url) pid = SheerIDVerifier.parse_program_id(verification_url) if not vid or not pid: return {"success": False, "message": "URL 解析失败"} log("\n开始验证流程...\n") verifier = SheerIDVerifier(vid, pid, proxy_pool=proxy_pool, log_callback=log_callback) res = verifier.verify(email_address) if res['success']: log(f"✅ {res.get('message')}") log(f"邮箱: {res.get('email')}") # 轮询等待 SheerID 验证邮件 emails = None max_retries = 20 # 最大重试次数 (约 100 秒) retry_count = 0 while retry_count < max_retries: time.sleep(5) retry_count += 1 log(f"等待验证邮件... ({retry_count}/{max_retries})") emails = mail.get_emails(email_address) if emails and len(emails) > 0: # 检查是否有来自 SheerID 的邮件 sheerid_email = next((e for e in emails if e.get('sender') == "Verify@SheerID.com"), None) if sheerid_email: log("收到验证邮件") emails = [sheerid_email] # 只处理这封邮件 break if not emails or len(emails) == 0: mail.delete_mailbox(email_address) return {"success": False, "message": "超时未收到验证邮件"} # 获取邮件内容 content = mail.get_email_detail(emails[0].get('id')) verification_link = extract_verification_link(content) email_token = extract_email_token(verification_link) if not email_token: mail.delete_mailbox(email_address) return {"success": False, "message": "无法提取 email token"} # 验证邮箱 token email_result = verifier.verify_email_token(email_token) mail.delete_mailbox(email_address) if email_result['success']: log("✅ 邮箱 token 验证成功") return {"success": True, "message": "验证流程全部完成", "email": email_address, "details": email_result} else: log("❌ 邮箱 token 验证失败") return {"success": False, "message": "邮箱 token 验证失败", "details": email_result} else: return {"success": False, "message": res.get('message')} except Exception as e: logger.error(f"验证过程发生异常: {e}") return {"success": False, "message": str(e)} def main(): print("=" * 50) print("SheerID 验证工具") print("=" * 50) # 获取验证URL if len(sys.argv) > 1: url = sys.argv[1] else: url = input("请输入验证 URL: ").strip() if not url: print("URL 不能为空") return result = process_verification(url) print("\n" + "="*50) print(f"最终结果: {'成功' if result['success'] else '失败'}") print(result.get('message')) print("="*50) if __name__ == '__main__': main()