加入代理 机器人

This commit is contained in:
dela
2026-01-07 16:40:28 +08:00
parent 90a9d369b6
commit 2d509c3807
7 changed files with 887 additions and 359 deletions

View File

@@ -5,446 +5,589 @@ from typing import Optional, Dict, Tuple
import time
import httpx
import re
import requests
import logging
import sys
import datetime
import os
from anyio import sleep
# --- 配置日志 ---
logging.basicConfig(
level=logging.DEBUG,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# --- 配置信息 ---
BASE_URL = "https://mail.copy.qzz.io/api"
AUTH_TOKEN = 'ks4cPQfAvMQI1iqhFCDJTGpuc5ez6B95Iaf9SzA47MP0LiH5Pv7urHjjvVsHJlCY'
# --- 公共变量 ---
headers = {
'Authorization': 'Bearer ' + AUTH_TOKEN
}
# --- 代理池配置 ---
PROXY_POOL_ENABLED = os.getenv("PROXY_POOL_ENABLED", "1").strip().lower() not in {"0", "false", "no", "off"} # 是否启用代理池
PROXY_POOL_API = os.getenv("PROXY_POOL_API", "https://getproxy.mygoband.com/v1/proxies/next").strip()
PROXY_POOL_TOKEN = os.getenv("PROXY_POOL_TOKEN", "").strip() # 你的 API key不需要带 Bearer 前缀)
PROXY_POOL_AUTH = f"Bearer {PROXY_POOL_TOKEN}" if PROXY_POOL_TOKEN else "" # Authorization: Bearer <token>
# 静态代理(当代理池不可用/关闭时作为兜底);留空则直连
PROXY_URL = os.getenv("PROXY_URL", "").strip() or None
# 例: http://username:password@ip:port
def extract_verification_link(content):
"""从邮件内容提取验证链接"""
# --- 代理池管理器 ---
class ProxyPool:
"""
代理池管理器,从代理服务端点获取随机代理
"""
def __init__(self, api_url: str, auth_token: str):
self.api_url = api_url
self.auth_token = auth_token
self.current_proxy = None
def get_proxy(self) -> str | None:
"""从代理池API获取一个新的代理"""
try:
# 解析URL
if not self.auth_token:
logger.error("获取代理失败: 未设置 PROXY_POOL_TOKENAuthorization 为空)")
return None
headers = {'Authorization': self.auth_token}
# 使用 httpx 发起请求
with httpx.Client(timeout=10.0) as client:
res = client.get(self.api_url, headers=headers)
if res.status_code != 200:
logger.error(f"获取代理失败 ({res.status_code}): {res.text}")
return None
# 解析响应
response_data = res.json()
# 提取代理信息
# 响应格式: {"proxy": {"protocol": "http", "host": "x.x.x.x", "port": 443, "username": "user", "password": "pass"}, "lease_id": "...", "ttl_ms": 59994}
if 'proxy' in response_data and isinstance(response_data['proxy'], dict):
proxy_info = response_data['proxy']
protocol = proxy_info.get('protocol', 'http')
proxy_host = proxy_info.get('host')
proxy_port = proxy_info.get('port')
username = proxy_info.get('username')
password = proxy_info.get('password')
# 强制使用 http 协议,不使用 https
protocol = 'http'
if proxy_host and proxy_port:
# 如果有用户名和密码,添加到代理 URL 中
if username and password:
proxy = f"{protocol}://{username}:{password}@{proxy_host}:{proxy_port}"
logger.info(f"代理需要认证: {username}@{proxy_host}:{proxy_port}")
else:
proxy = f"{protocol}://{proxy_host}:{proxy_port}"
else:
logger.error(f"代理信息不完整: {proxy_info}")
return None
elif 'proxy' in response_data and isinstance(response_data['proxy'], str):
# 如果是字符串格式(可能已经包含认证信息)
proxy = response_data['proxy']
# 强制替换 https 为 http
if proxy.startswith('https://'):
proxy = proxy.replace('https://', 'http://', 1)
elif 'host' in response_data:
# 兼容旧格式
proxy_host = response_data.get('host')
proxy_port = response_data.get('port')
username = response_data.get('username', '')
password = response_data.get('password', '')
if username and password:
proxy = f"http://{username}:{password}@{proxy_host}:{proxy_port}"
else:
proxy = f"http://{proxy_host}:{proxy_port}"
else:
# 如果是纯文本格式的代理地址
proxy = res.text.strip()
# 强制替换 https 为 http
if proxy.startswith('https://'):
proxy = proxy.replace('https://', 'http://', 1)
self.current_proxy = proxy
logger.info(f"获取到新代理: {proxy}")
return proxy
except Exception as e:
logger.error(f"获取代理请求失败: {e}")
return None
def refresh_proxy(self) -> str | None:
"""刷新代理(获取新的代理)"""
return self.get_proxy()
# --- 随机身份生成器 ---
class RandomIdentity:
def __init__(self):
self.first_names = ["KELLY","RYAN"]
self.last_names = ["BURKHART","Pitts"]
random_num = random.randint(0, 1)
middle_name = ""
for i in range(random.randint(5, 10)):
middle_name += random.choice(string.ascii_uppercase)
self.first_name = self.first_names[random_num] + " " + middle_name
self.last_name = self.last_names[random_num]
# 生日在这两个里面选择1981-05-14 1985-10-01使用上面的random_num来选择
self.birth_date = "1981-05-14" if random_num == 0 else "1985-10-01"
# 随机退伍日期 (最近3年)
d_year = random.randint(2025, 2025)
d_month = random.randint(1, 12)
d_day = random.randint(1, 28)
self.discharge_date = f"{d_year}-{d_month:02d}-{d_day:02d}"
# --- 随机指纹生成器 ---
class RandomFingerprint:
def __init__(self):
self.chrome_version = random.randint(120, 133)
self.build_version = random.randint(0, 5000)
self.patch_version = random.randint(0, 150)
self.platform = "Windows" if random.random() < 0.8 else "Linux"
self.platform_os = '"Windows"' if self.platform == "Windows" else '"Linux"'
if self.platform == "Windows":
self.ua_string = f'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
else:
self.ua_string = f'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/{self.chrome_version}.0.{self.build_version}.{self.patch_version} Safari/537.36'
# 生成设备指纹哈希 (32位十六进制字符串)
self.device_fingerprint_hash = ''.join(random.choice('0123456789abcdef') for _ in range(32))
def get_headers(self) -> Dict:
return {
'sec-ch-ua': f'"Chromium";v="{self.chrome_version}", "Not(A:Brand";v="24", "Google Chrome";v="{self.chrome_version}"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': self.platform_os,
'user-agent': self.ua_string
}
# --- 辅助函数 ---
def extract_verification_link(content: str) -> str | None:
"""从邮件内容提取 SheerID 验证链接"""
if not content:
return None
# 优先匹配 href 属性中的链接
match = re.search(r'href="(https://services\.sheerid\.com/verify/[^"]+emailToken=[^"]+)"', content)
if match:
return match.group(1).replace('&amp;', '&')
# 备用:直接匹配 URL
match = re.search(r'https://services\.sheerid\.com/verify/[^\s<>"]+emailToken=\d+', content)
if match:
return match.group(0)
return None
def generate_new_email(length: int = 8, domain_index: int = 0) -> str | None:
def extract_email_token(url: str) -> str | None:
"""从验证链接提取 emailToken"""
if not url:
return None
match = re.search(r'emailToken=(\d+)', url)
return match.group(1) if match else None
class MailClient:
"""
向 API 请求生成一个新的临时邮箱地址
Args:
length (int): 邮箱地址的用户名长度。
domain_index (int): 使用的域名索引。
Returns:
str | None: 成功时返回邮箱地址字符串,失败时返回 None。
邮件客户端类,用于与临时邮箱 API 交互
邮箱操作使用直连,不使用代理
"""
# 1. 明确操作意图,并记录请求参数
operation = "生成新邮箱地址"
endpoint = f"{BASE_URL}/generate"
params = {'length': length, 'domainIndex': domain_index}
logger.info(f"开始操作: {operation} (参数: {params})")
def __init__(self, base_url: str = "https://mail.copy.qzz.io/api",
auth_token: str = 'ks4cPQfAvMQI1iqhFCDJTGpuc5ez6B95Iaf9SzA47MP0LiH5Pv7urHjjvVsHJlCY'):
self.base_url = base_url
self.headers = {'Authorization': f'Bearer {auth_token}'}
try:
# 使用 params 参数,让 requests 自动处理 URL 查询字符串
logger.debug(f"发送 GET 请求至: {endpoint}")
response = requests.get(endpoint, headers=headers, params=params)
# 2. 关联请求与响应
logger.info(f"操作 '{operation}' 收到响应,状态码: {response.status_code}")
# 检查 HTTP 错误状态码
response.raise_for_status()
# 尝试解析 JSON
data = response.json()
logger.debug(f"成功解析 JSON 响应: \n{json.dumps(data, indent=2)}")
email = data.get('email')
if email:
# 3. 增加成功日志
logger.info(f"成功 {operation},获取到邮箱: {email}")
return email
else:
logger.error(f"操作 '{operation}' 失败: 响应 JSON 中缺少 'email' 键。")
# 邮箱操作使用直连,不使用代理
self.http_client = httpx.Client(timeout=30.0)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
def generate_email(self, length: int = 10, domain_index: int = 0) -> str | None:
"""生成一个新的临时邮箱地址"""
try:
response = self.http_client.get(
f"{self.base_url}/generate",
headers=self.headers,
params={'length': length, 'domainIndex': domain_index}
)
if response.status_code != 200:
logger.error(f"生成邮箱失败 ({response.status_code}): {response.text}")
return None
return response.json().get('email')
except Exception as e:
logger.error(f"生成邮箱请求失败: {e}")
return None
except requests.exceptions.HTTPError as err:
# 4. 丰富错误上下文
logger.error(f"操作 '{operation}' 失败,发生 HTTP 错误: {err}")
# 在 debug 级别记录完整的服务器响应,避免在 INFO/ERROR 级别刷屏
logger.debug(f"服务器错误响应全文: {err.response.text}")
return None
except requests.exceptions.RequestException as err:
logger.critical(f"操作 '{operation}' 失败,发生严重请求错误 (如网络问题): {err}")
return None
except json.JSONDecodeError:
logger.error(f"操作 '{operation}' 失败: 无法解析服务器响应为 JSON。")
logger.debug(f"服务器返回的非 JSON 内容: '{response.text}'")
return None
def get_mail_detail_from_mailbox(email_address: str, email_id: int) -> str | None:
"""
从指定的邮箱中获取所有邮件,并提取出 SheerID 验证链接。
Args:
email_address (str): 要查询的临时邮箱地址。
auth_token (str): 用于 API 认证的 Bearer Token。
Returns:
list[str]: 一个包含所有找到的 SheerID 链接的列表。
如果找不到链接或发生错误,则返回一个空列表。
"""
base_url = "https://mail.copy.qzz.io/api"
endpoint = f"{base_url}/email/{email_id}"
operation = f"获取邮箱 <{email_address}> 内的 SheerID 链接"
logger.info(f"开始操作: {operation}")
try:
# --- 发送 API 请求 ---
response = requests.get(endpoint, headers=headers)
# --- 解析 JSON ---
data = response.json()
logger.debug(f"成功解析 JSON 响应: \n{json.dumps(data, indent=2)}")
extract_verification_link(data.get('html_content'))
return data.get('html_content')
except requests.exceptions.HTTPError as err:
logger.error(f"操作 '{operation}' 失败: 发生 HTTP 错误: {err}")
logger.debug(f"服务器错误响应全文: {err.response.text}")
return None # 发生错误时返回空列表
except requests.exceptions.RequestException as err:
logger.critical(f"操作 '{operation}' 失败: 发生严重请求错误 (如网络问题): {err}")
return None
except json.JSONDecodeError:
logger.error(f"操作 '{operation}' 失败: 无法解析服务器响应为 JSON。")
logger.debug(f"服务器返回的非 JSON 内容: '{response.text}'")
return None
def get_sheerid_links_from_mailbox(email_address: str) -> str | None:
"""
从指定的邮箱中获取所有邮件,并提取出 SheerID 验证链接。
Args:
email_address (str): 要查询的临时邮箱地址。
auth_token (str): 用于 API 认证的 Bearer Token。
Returns:
list[str]: 一个包含所有找到的 SheerID 链接的列表。
如果找不到链接或发生错误,则返回一个空列表。
"""
base_url = "https://mail.copy.qzz.io/api"
endpoint = f"{base_url}/emails"
params = {
"mailbox": email_address
}
operation = f"获取邮箱 <{email_address}> 内的 SheerID 链接"
logger.info(f"开始操作: {operation}")
try:
# --- 发送 API 请求 ---
response = requests.get(endpoint, headers=headers, params=params)
response.raise_for_status()
logger.info(f"操作 '{operation}' 收到响应,状态码: {response.status_code}")
# --- 解析 JSON ---
data = response.json()
logger.debug(f"成功解析 JSON 响应: \n{json.dumps(data, indent=2)}")
if not data[0].get('sender') == "Verify@SheerID.com":
def get_emails(self, mailbox: str) -> list | None:
"""获取指定邮箱的邮件列表"""
try:
response = self.http_client.get(
f"{self.base_url}/emails",
headers=self.headers,
params={'mailbox': mailbox}
)
if response.status_code != 200:
logger.error(f"获取邮件列表失败 ({response.status_code}): {response.text}")
return None
return response.json()
except Exception as e:
logger.error(f"获取邮件列表请求失败: {e}")
return None
id = data[0].get('id')
logger.debug(id)
content = get_mail_detail_from_mailbox(email_address, id)
url = extract_verification_link(content)
return url
except requests.exceptions.HTTPError as err:
logger.error(f"操作 '{operation}' 失败: 发生 HTTP 错误: {err}")
logger.debug(f"服务器错误响应全文: {err.response.text}")
return None # 发生错误时返回空列表
except requests.exceptions.RequestException as err:
logger.critical(f"操作 '{operation}' 失败: 发生严重请求错误 (如网络问题): {err}")
return None
except json.JSONDecodeError:
logger.error(f"操作 '{operation}' 失败: 无法解析服务器响应为 JSON。")
logger.debug(f"服务器返回的非 JSON 内容: '{response.text}'")
return None
def delete_mailbox(email_address: str) -> bool:
"""
通过 API 删除一个临时邮箱地址(邮箱)。
Args:
email_address (str): 要删除的邮箱地址。
auth_token (str): 用于 API 认证的 Bearer Token。
Returns:
bool: 如果成功删除则返回 True否则返回 False。
"""
# 稍微修改了函数名,因为 API 端点是 /mailboxes更准确
operation = f"删除邮箱 <{email_address}>"
logger.info(f"开始操作: {operation}")
# 1. 输入验证
if not email_address:
logger.warning("尝试删除一个空的邮箱地址,操作已取消。")
return False
endpoint = f"{BASE_URL}/mailboxes"
params = {
'address': email_address
}
try:
# --- 发送 API 请求 ---
response = requests.delete(endpoint, headers=headers, params=params)
logger.info(f"操作 '{operation}' 收到响应,状态码: {response.status_code}")
response.raise_for_status() # 如果状态码是 4xx 或 5xx将在此处引发 HTTPError
# 2. 检查成功响应
# 成功的 DELETE 请求通常返回 204 No Content (无响应体)
if response.status_code == 204:
logger.info(f"成功 {operation} (状态码 204)。")
def get_email_detail(self, email_id: int) -> str | None:
"""获取指定邮件的详细内容"""
try:
response = self.http_client.get(
f"{self.base_url}/email/{email_id}",
headers=self.headers
)
if response.status_code != 200:
logger.error(f"获取邮件详情失败 ({response.status_code}): {response.text}")
return None
data = response.json()
return data.get('html_content') or data.get('content')
except Exception as e:
logger.error(f"获取邮件详情请求失败: {e}")
return None
def delete_mailbox(self, address: str) -> bool:
"""删除指定的邮箱"""
if not address:
return False
try:
response = self.http_client.delete(
f"{self.base_url}/mailboxes",
headers=self.headers,
params={'address': address}
)
if response.status_code not in [200, 204]:
logger.error(f"删除邮箱失败 ({response.status_code}): {response.text}")
return False
return True
# 如果返回 200 OK 并带有 JSON 体
data = response.json()
logger.debug(f"成功 {operation},收到的 JSON 详情: \n{json.dumps(data, indent=2)}")
logger.info(f"成功 {operation} (状态码 {response.status_code})。")
return True
except requests.exceptions.HTTPError as err:
logger.error(f"操作 '{operation}' 失败: 发生 HTTP 错误: {err}")
# 使用 debug 级别记录详细响应,避免刷屏
logger.debug(f"服务器错误响应全文: {err.response.text}")
return False
except requests.exceptions.RequestException as err:
logger.critical(f"操作 '{operation}' 失败: 发生网络等严重请求错误: {err}")
return False
except json.JSONDecodeError:
logger.error(f"操作 '{operation}' 失败: 服务器返回了成功的状态码,但响应不是有效的 JSON。")
logger.debug(f"服务器返回的非 JSON 内容: '{response.text}'")
return False
except Exception as e:
logger.error(f"删除邮箱请求失败: {e}")
return False
# --- 主逻辑类 ---
class SheerIDVerifier:
def __init__(self, verification_id: str,program_id:str) -> None:
def __init__(self, verification_id: str, program_id: str, proxy_pool: ProxyPool = None, log_callback=None) -> None:
self.program_id = program_id
self.verification_id = verification_id
self.device_fingerprint = self._generate_device_fingerprint()
self.http_client = httpx.Client(timeout=30.0)
self.fingerprint = RandomFingerprint()
self.device_fingerprint_hash = self.fingerprint.device_fingerprint_hash
self.identity = RandomIdentity()
self.proxy_pool = proxy_pool
self.log_callback = log_callback
self.log(f"指纹初始化: Chrome {self.fingerprint.chrome_version}")
self.log(f"身份初始化: {self.identity.first_name} {self.identity.last_name}, DOB: {self.identity.birth_date}")
# 配置代理 - 获取一个代理并在整个验证过程中使用同一个代理
self.proxy_url = None
if self.proxy_pool:
self.proxy_url = self.proxy_pool.get_proxy()
if self.proxy_url:
self.log(f"已启用代理池连接: {self.proxy_url}")
else:
logger.warning("代理池获取失败,使用静态代理或直连")
if not self.proxy_url and PROXY_URL:
self.proxy_url = PROXY_URL
self.log("已启用静态代理连接")
# 修复httpx 使用 'proxy' 而不是 'proxies'
self.http_client = httpx.Client(
timeout=30.0,
follow_redirects=True,
http2=False,
proxy=self.proxy_url
)
def __del__(self):
if hasattr(self, "http_client"):
self.http_client.close()
@staticmethod
def _generate_device_fingerprint() -> str:
chars = '0123456789abcdef'
return ''.join(random.choice(chars) for _ in range(32))
@staticmethod
def normalize_url(url: str) -> str:
"""规范化 URL保留原样"""
return url
def log(self, msg):
logger.info(msg)
if self.log_callback:
try:
self.log_callback(msg)
except Exception:
pass
@staticmethod
def parse_verification_id(url: str) -> Optional[str]:
match = re.search(r"verificationId=([a-f0-9]+)", url, re.IGNORECASE)
if match:
return match.group(1)
return None
return match.group(1) if match else None
@staticmethod
def parse_program_id(url: str) -> Optional[str]:
match = re.search(r"verify/([a-f0-9]+)/", url, re.IGNORECASE)
if match:
return match.group(1)
return None
return match.group(1) if match else None
def _sheerid_request(
self, method: str, url: str, body: Optional[Dict] = None
) -> Tuple[Dict, int]:
"""发送 SheerID API 请求"""
headers = {
def _get_common_headers(self, referer_vid: str = None) -> Dict:
vid = referer_vid if referer_vid else self.verification_id
base_headers = {
'host': 'services.sheerid.com',
'sec-ch-ua-platform': '"Windows"',
'sec-ch-ua': f'"Chromium";v="142", "Google Chrome";v="142", "Not_A Brand";v="99"',
'clientversion': '2.157.0',
'sec-ch-ua-mobile': '?0',
'accept': 'application/json, text/plain, */*',
'accept-language': 'en-US,en;q=0.9',
'clientname': 'jslib',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36',
'accept': 'application/json',
'content-type': 'application/json',
'clientversion': '2.157.0',
'origin': 'https://services.sheerid.com',
'sec-fetch-site': 'same-origin',
'sec-fetch-mode': 'cors',
'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={vid}',
'sec-fetch-dest': 'empty',
'referer': f'https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}',
'accept-encoding': 'gzip, deflate, br, zstd',
'accept-language': 'en-US,en-GB;q=0.9,en;q=0.8',
'sec-fetch-mode': 'cors',
'sec-fetch-site': 'same-origin',
'priority': 'u=1, i',
}
base_headers.update(self.fingerprint.get_headers())
return base_headers
def verify(self, email_address: str) -> Dict:
try:
response = self.http_client.request(
method=method, url=url, json=body, headers=headers
)
try:
data = response.json()
except Exception:
data = response.text
return data, response.status_code
except Exception as e:
logger.error(f"SheerID 请求失败: {e}")
raise
self.log(f"使用邮箱: {email_address}")
time.sleep(random.uniform(1.0, 2.5))
def verify(self) -> Dict:
try:
# --- 步骤 0: 握手 ---
self.log(">>> 步骤 0: 初始化会话...")
init_url = f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}"
init_headers = {
'host': 'services.sheerid.com',
'upgrade-insecure-requests': '1',
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,*/*;q=0.8',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
}
init_headers.update(self.fingerprint.get_headers())
self.http_client.get(init_url, headers=init_headers)
time.sleep(random.uniform(0.5, 1.5))
email_address = generate_new_email(10,0)
time.sleep(30)
submit_data, submit_status = self._sheerid_request(
"POST",
f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus",
{'status': 'VETERAN'},
)
if submit_status != 200:
raise Exception(f"提交状态 失败 (状态码 {submit_status}): {submit_data}")
if submit_data.get("currentStep") != "collectInactiveMilitaryPersonalInfo":
error_msg = ", ".join(submit_data.get("errorIds", ["Unknown error"]))
raise Exception(f"提交状态 错误: {error_msg}")
submissionUrl = submit_data.get("submissionUrl")
verificationId = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE)
random_string = ''.join(random.choices(string.ascii_uppercase, k=5))
# --- 步骤 1: 提交 Status ---
self.log(">>> 步骤 1: 提交 Status...")
step1_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/collectMilitaryStatus"
resp1 = self.http_client.post(step1_url, json={'status': 'VETERAN'}, headers=self._get_common_headers())
if resp1.status_code != 200:
raise Exception(f"步骤 1 失败 ({resp1.status_code}): {resp1.text}")
data1 = resp1.json()
time.sleep(random.uniform(0.5, 1.5))
# --- 步骤 2: 提交个人信息 ---
submissionUrl = data1.get("submissionUrl")
current_vid = self.verification_id
new_vid_match = re.search(r"verificationId=([a-f0-9]+)", submissionUrl, re.IGNORECASE)
if new_vid_match:
new_vid = new_vid_match.group(1)
if new_vid != self.verification_id:
current_vid = new_vid
verify_body = {
"firstName": "KELLY " + random_string,
"lastName": "BURKHART",
"birthDate": "1981-05-14",
"firstName": self.identity.first_name,
"lastName": self.identity.last_name,
"birthDate": self.identity.birth_date,
"email": email_address,
"phoneNumber": "",
"organization": {
"id": 4070,
"name": "Army"
},
"dischargeDate": "2025-10-29",
"organization": {"id": 4070, "name": "Army"},
"dischargeDate": self.identity.discharge_date,
'deviceFingerprintHash': self.device_fingerprint_hash,
"locale": "en-US",
"country": "US",
"metadata": {
"marketConsentValue": False,
"refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={self.verification_id}",
"verificationId": verificationId,
"refererUrl": f"https://services.sheerid.com/verify/{self.program_id}/?verificationId={current_vid}",
"verificationId": current_vid,
"flags": "{\"doc-upload-considerations\":\"default\",\"doc-upload-may24\":\"default\",\"doc-upload-redesign-use-legacy-message-keys\":false,\"docUpload-assertion-checklist\":\"default\",\"include-cvec-field-france-student\":\"not-labeled-optional\",\"org-search-overlay\":\"default\",\"org-selected-display\":\"default\"}",
"submissionOptIn": "By submitting the personal information above, I acknowledge that my personal information is being collected under the <a target=\"_blank\" rel=\"noopener noreferrer\" class=\"sid-privacy-policy sid-link\" href=\"https://openai.com/policies/privacy-policy/\">privacy policy</a> of the business from which I am seeking a discount, and I understand that my personal information will be shared with SheerID as a processor/third-party service provider in order for SheerID to confirm my eligibility for a special offer. Contact OpenAI Support for further assistance at support@openai.com"
"submissionOptIn": "By submitting..."
}
}
self.log(f">>> 步骤 2: 提交个人信息 ({self.identity.first_name} {self.identity.last_name})...")
resp2 = self.http_client.post(submissionUrl, json=verify_body, headers=self._get_common_headers(current_vid))
if resp2.status_code == 429:
self.log("🛑 触发 429 风控! 建议更换 IP 或暂停运行。")
raise Exception("Rate Limit / Verification Limit Exceeded",resp2.text)
if resp2.status_code != 200:
self.log(f"步骤 2 响应: {resp2.text}")
raise Exception(f"步骤 2 失败 ({resp2.status_code})")
data2 = resp2.json()
if data2.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data2.get('errorIds')} - {data2.get('systemErrorMessage')}")
self.log(f"步骤 2 成功! Current Step: {data2.get('currentStep')}")
self.log(f"\n✅ 验证邮件已发送到: {email_address}")
self.log("请检查邮箱并点击验证链接完成验证!")
info_data, info_status = self._sheerid_request(
"POST",
submissionUrl,
verify_body,
)
if info_status != 200:
raise Exception(f"提交信息 失败 (状态码 {info_status}): {info_data}")
if info_data.get("currentStep") == "error":
error_msg = ", ".join(info_data.get("errorIds", ["Unknown error"]))
raise Exception(f"提交信息 错误: {error_msg}")
time.sleep(120)
finish_verifying_url = get_sheerid_links_from_mailbox(email_address)
while True:
if finish_verifying_url:
break
time.sleep(120)
finish_verifying_url = get_sheerid_links_from_mailbox(email_address)
logger.debug("完成验证 " + finish_verifying_url)
final_data, final_status = self._sheerid_request(
"POST",
finish_verifying_url,
)
if final_status != 200:
raise Exception(f"完成验证 失败 (状态码 {submit_status}): {submit_data}")
logger.debug(f"完成验证 {json.dumps(final_data, indent=2)}")
time.sleep(30)
delete_mailbox(email_address)
# 不做状态轮询,直接返回等待审核
return {
"success": True,
"pending": True,
"verification_id": self.verification_id,
"redirect_url": final_data.get("redirectUrl"),
"status": final_data,
"email": email_address,
"message": "验证邮件已发送,请检查邮箱"
}
except Exception as e:
logger.error(f"验证失败: {e}")
return {"success": False, "message": str(e), "verification_id": self.verification_id}
logger.error(f"❌ 失败: {e}")
# 删除邮箱
if hasattr(self, 'mail'):
self.mail.delete_mailbox(email_address)
self.log(f"邮箱 {email_address} 已删除")
return {"success": False, "message": str(e)}
# 增加一个验证邮箱的token的步骤
def verify_email_token(self, email_token):
try:
self.log(">>> 步骤 3: 验证邮箱 token...")
step3_url = f"https://services.sheerid.com/rest/v2/verification/{self.verification_id}/step/emailLoop"
resp3 = self.http_client.post(step3_url, json={"emailToken": email_token, "deviceFingerprintHash": self.device_fingerprint_hash}, headers=self._get_common_headers())
if resp3.status_code != 200:
raise Exception(f"步骤 3 失败 ({resp3.status_code}): {resp3.text}")
data3 = resp3.json()
time.sleep(random.uniform(0.5, 1.5))
if data3.get("currentStep") == "error":
raise Exception(f"逻辑错误: {data3.get('errorIds')} - {data3.get('systemErrorMessage')}")
self.log(f"步骤 3 成功! Current Step: {data3.get('currentStep')}")
return {"success": True, "message": "邮箱 token 验证成功"}
except Exception as e:
logger.error(f"❌ 失败: {e}")
return {"success": False, "message": str(e)}
def process_verification(verification_url: str, log_callback=None) -> Dict:
"""
执行完整的验证流程
"""
def log(msg):
logger.info(msg)
if log_callback:
try:
log_callback(msg)
except Exception:
pass
try:
# 初始化代理池
proxy_pool = None
if PROXY_POOL_ENABLED:
log("启用代理池模式")
if not PROXY_POOL_AUTH:
log("⚠️ 代理池已启用但未设置 PROXY_POOL_TOKEN自动跳过代理池改用静态代理或直连")
else:
proxy_pool = ProxyPool(PROXY_POOL_API, PROXY_POOL_AUTH)
# 获取邮箱地址
mail = MailClient()
email_address = mail.generate_email()
if not email_address:
return {"success": False, "message": "错误: 邮箱地址生成失败"}
# 解析 URL
vid = SheerIDVerifier.parse_verification_id(verification_url)
pid = SheerIDVerifier.parse_program_id(verification_url)
if not vid or not pid:
return {"success": False, "message": "URL 解析失败"}
log("\n开始验证流程...\n")
verifier = SheerIDVerifier(vid, pid, proxy_pool=proxy_pool, log_callback=log_callback)
res = verifier.verify(email_address)
if res['success']:
log(f"{res.get('message')}")
log(f"邮箱: {res.get('email')}")
# 轮询等待 SheerID 验证邮件
emails = None
max_retries = 20 # 最大重试次数 (约 100 秒)
retry_count = 0
while retry_count < max_retries:
time.sleep(5)
retry_count += 1
log(f"等待验证邮件... ({retry_count}/{max_retries})")
emails = mail.get_emails(email_address)
if emails and len(emails) > 0:
# 检查是否有来自 SheerID 的邮件
sheerid_email = next((e for e in emails if e.get('sender') == "Verify@SheerID.com"), None)
if sheerid_email:
log("收到验证邮件")
emails = [sheerid_email] # 只处理这封邮件
break
if not emails or len(emails) == 0:
mail.delete_mailbox(email_address)
return {"success": False, "message": "超时未收到验证邮件"}
# 获取邮件内容
content = mail.get_email_detail(emails[0].get('id'))
verification_link = extract_verification_link(content)
email_token = extract_email_token(verification_link)
if not email_token:
mail.delete_mailbox(email_address)
return {"success": False, "message": "无法提取 email token"}
# 验证邮箱 token
email_result = verifier.verify_email_token(email_token)
mail.delete_mailbox(email_address)
if email_result['success']:
log("✅ 邮箱 token 验证成功")
return {"success": True, "message": "验证流程全部完成", "email": email_address, "details": email_result}
else:
log("❌ 邮箱 token 验证失败")
return {"success": False, "message": "邮箱 token 验证失败", "details": email_result}
else:
return {"success": False, "message": res.get('message')}
except Exception as e:
logger.error(f"验证过程发生异常: {e}")
return {"success": False, "message": str(e)}
def main():
"""主函数 - 命令行界面"""
import sys
print("=" * 60)
print("SheerID 学生身份验证工具 (Python版)")
print("=" * 60)
print()
print("=" * 50)
print("SheerID 验证工具")
print("=" * 50)
# 获取验证URL
if len(sys.argv) > 1:
url = sys.argv[1]
else:
url = input("请输入 SheerID 验证 URL: ").strip()
url = input("请输入验证 URL: ").strip()
if not url:
print("❌ 错误: 未提供 URL")
sys.exit(1)
print("URL 不能为空")
return
verification_id = SheerIDVerifier.parse_verification_id(url)
program_id = SheerIDVerifier.parse_program_id(url)
if not verification_id and not program_id:
print("❌ 错误: 无效的验证 ID 格式")
sys.exit(1)
result = process_verification(url)
print("\n" + "="*50)
print(f"最终结果: {'成功' if result['success'] else '失败'}")
print(result.get('message'))
print("="*50)
print(f"✅ 解析到验证 ID: {verification_id}\n✅ 解析到验证 ID: {program_id}", end="\n")
verifier = SheerIDVerifier(verification_id,program_id)
result = verifier.verify()
print()
print("=" * 60)
print("验证结果:")
print("=" * 60)
print(f"状态: {'✅ 成功' if result['success'] else '❌ 失败'}")
print(f"消息: {result['message']}")
if result.get("redirect_url"):
print(f"跳转 URL: {result['redirect_url']}")
print("=" * 60)
return 0 if result["success"] else 1
if __name__ == '__main__':
exit(main())
main()