174 lines
5.8 KiB
Python
174 lines
5.8 KiB
Python
import time
|
|
import re
|
|
import random
|
|
import threading
|
|
import requests as standard_requests # 用于普通API交互
|
|
|
|
|
|
class MailSystem:
|
|
"""单个邮箱系统实例,支持多域名"""
|
|
|
|
def __init__(self, base_url, admin_email, admin_password, domains):
|
|
self.base_url = base_url
|
|
self.domains = domains # 该系统支持的域名列表
|
|
self.token = self._get_token(admin_email, admin_password)
|
|
self.headers = {"Authorization": self.token}
|
|
|
|
def _get_token(self, email, password):
|
|
"""获取身份令牌,这是我们的通行证"""
|
|
url = f"{self.base_url}/api/public/genToken"
|
|
payload = {"email": email, "password": password}
|
|
try:
|
|
resp = standard_requests.post(url, json=payload)
|
|
data = resp.json()
|
|
if data['code'] == 200:
|
|
print(f"[+] 令牌获取成功 ({self.base_url}): {data['data']['token'][:10]}...")
|
|
return data['data']['token']
|
|
else:
|
|
raise Exception(f"获取Token失败: {data}")
|
|
except Exception as e:
|
|
print(f"[-] 连接邮件系统失败 ({self.base_url}): {e}")
|
|
return None
|
|
|
|
def create_user(self, email_prefix, domain=None):
|
|
"""在系统里注册一个新邮箱用户"""
|
|
if domain is None:
|
|
domain = random.choice(self.domains)
|
|
full_email = f"{email_prefix}@{domain}"
|
|
url = f"{self.base_url}/api/public/addUser"
|
|
payload = {
|
|
"list": [
|
|
{
|
|
"email": full_email,
|
|
"password": "random_pass_ignoring_this"
|
|
}
|
|
]
|
|
}
|
|
resp = standard_requests.post(url, json=payload, headers=self.headers)
|
|
if resp.json().get('code') == 200:
|
|
print(f"[+] 邮箱用户创建成功: {full_email}")
|
|
return full_email
|
|
else:
|
|
print(f"[-] 创建邮箱失败: {resp.text}")
|
|
return None
|
|
|
|
def wait_for_email(self, to_email, retry_count=20, sleep_time=3):
|
|
"""像猎人一样耐心等待猎物出现"""
|
|
url = f"{self.base_url}/api/public/emailList"
|
|
payload = {
|
|
"toEmail": to_email,
|
|
"sendName": "Anthropic",
|
|
"num": 1,
|
|
"size": 10,
|
|
"timeSort": "desc"
|
|
}
|
|
|
|
print(f"[*] 开始轮询邮件,目标: {to_email}...")
|
|
|
|
for i in range(retry_count):
|
|
try:
|
|
resp = standard_requests.post(url, json=payload, headers=self.headers)
|
|
data = resp.json()
|
|
|
|
if data.get('code') == 200 and data.get('data'):
|
|
emails = data['data']
|
|
for email in emails:
|
|
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
|
|
return email.get('content') or email.get('text')
|
|
|
|
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
|
|
time.sleep(sleep_time)
|
|
except Exception as e:
|
|
print(f"[-] 轮询出错: {e}")
|
|
time.sleep(sleep_time)
|
|
|
|
print("[-] 等待超时,未收到邮件。")
|
|
return None
|
|
|
|
def __repr__(self):
|
|
return f"MailSystem({self.base_url}, domains={self.domains})"
|
|
|
|
|
|
class MailPool:
|
|
"""多邮箱系统轮询调度器"""
|
|
|
|
def __init__(self, mail_configs: list[dict]):
|
|
"""
|
|
mail_configs: config.MAIL_SYSTEMS 格式的列表
|
|
"""
|
|
self.systems: list[MailSystem] = []
|
|
self._index = 0
|
|
self._lock = threading.Lock()
|
|
|
|
for cfg in mail_configs:
|
|
ms = MailSystem(
|
|
base_url=cfg["base_url"],
|
|
admin_email=cfg["admin_email"],
|
|
admin_password=cfg["admin_pass"],
|
|
domains=cfg["domains"],
|
|
)
|
|
if ms.token: # 只添加连接成功的系统
|
|
self.systems.append(ms)
|
|
else:
|
|
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
|
|
|
|
if not self.systems:
|
|
print("[-] 没有可用的邮箱系统!")
|
|
|
|
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)} 个")
|
|
|
|
def next(self) -> MailSystem | None:
|
|
"""Round-robin 返回下一个系统"""
|
|
if not self.systems:
|
|
return None
|
|
with self._lock:
|
|
ms = self.systems[self._index % len(self.systems)]
|
|
self._index += 1
|
|
return ms
|
|
|
|
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
|
|
"""
|
|
使用下一个系统创建用户。
|
|
返回 (email, 对应的 MailSystem) 或 (None, None)。
|
|
"""
|
|
ms = self.next()
|
|
if not ms:
|
|
print("[-] 没有可用的邮箱系统")
|
|
return None, None
|
|
email = ms.create_user(email_prefix)
|
|
return email, ms
|
|
|
|
def get_system_by_domain(self, email: str) -> MailSystem | None:
|
|
"""根据邮箱域名找到对应的 MailSystem"""
|
|
domain = email.split("@")[-1] if "@" in email else ""
|
|
for ms in self.systems:
|
|
if domain in ms.domains:
|
|
return ms
|
|
return None
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return len(self.systems)
|
|
|
|
def info(self) -> str:
|
|
"""返回所有系统的信息摘要"""
|
|
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
|
|
for i, ms in enumerate(self.systems, 1):
|
|
domains = ", ".join(ms.domains)
|
|
lines.append(f" {i}. {ms.base_url} → [{domains}]")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_magic_link(html_content):
|
|
"""HTML提取出链接"""
|
|
if not html_content:
|
|
return None
|
|
|
|
pattern = r'href="(https://claude\.ai/[^"]+)"'
|
|
match = re.search(pattern, html_content)
|
|
|
|
if match:
|
|
url = match.group(1)
|
|
return url.replace("&", "&")
|
|
return None
|