Files
autoClaude/mail_service.py

161 lines
5.2 KiB
Python

import time
import re
import random
import threading
import requests as standard_requests # 用于普通API交互
class MailSystem:
"""单个邮箱系统实例,支持多域名"""
def __init__(self, base_url, api_token, domains):
self.base_url = base_url
self.domains = domains # 该系统支持的域名列表
self.token = api_token
self.headers = {"Authorization": self.token}
if self.token:
print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...")
else:
print(f"[-] 邮箱系统 Token 为空 ({self.base_url})")
def create_user(self, email_prefix, domain=None):
"""在系统里注册一个新邮箱用户"""
if domain is None:
domain = random.choice(self.domains)
full_email = f"{email_prefix}@{domain}"
url = f"{self.base_url}/api/public/addUser"
payload = {
"list": [
{
"email": full_email,
"password": "random_pass_ignoring_this"
}
]
}
resp = standard_requests.post(url, json=payload, headers=self.headers)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
else:
print(f"[-] 创建邮箱失败: {resp.text}")
return None
def wait_for_email(self, to_email, retry_count=20, sleep_time=3):
"""像猎人一样耐心等待猎物出现"""
url = f"{self.base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"sendName": "Anthropic",
"num": 1,
"size": 10,
"timeSort": "desc"
}
print(f"[*] 开始轮询邮件,目标: {to_email}...")
for i in range(retry_count):
try:
resp = standard_requests.post(url, json=payload, headers=self.headers)
data = resp.json()
if data.get('code') == 200 and data.get('data'):
emails = data['data']
for email in emails:
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
return email.get('content') or email.get('text')
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
time.sleep(sleep_time)
except Exception as e:
print(f"[-] 轮询出错: {e}")
time.sleep(sleep_time)
print("[-] 等待超时,未收到邮件。")
return None
def __repr__(self):
return f"MailSystem({self.base_url}, domains={self.domains})"
class MailPool:
"""多邮箱系统轮询调度器"""
def __init__(self, mail_configs: list[dict]):
"""
mail_configs: config.MAIL_SYSTEMS 格式的列表
"""
self.systems: list[MailSystem] = []
self._index = 0
self._lock = threading.Lock()
for cfg in mail_configs:
ms = MailSystem(
base_url=cfg["base_url"],
api_token=cfg.get("api_token", ""),
domains=cfg["domains"],
)
if ms.token: # 只添加连接成功的系统
self.systems.append(ms)
else:
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
if not self.systems:
print("[-] 没有可用的邮箱系统!")
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)}")
def next(self) -> MailSystem | None:
"""Round-robin 返回下一个系统"""
if not self.systems:
return None
with self._lock:
ms = self.systems[self._index % len(self.systems)]
self._index += 1
return ms
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
"""
使用下一个系统创建用户。
返回 (email, 对应的 MailSystem) 或 (None, None)。
"""
ms = self.next()
if not ms:
print("[-] 没有可用的邮箱系统")
return None, None
email = ms.create_user(email_prefix)
return email, ms
def get_system_by_domain(self, email: str) -> MailSystem | None:
"""根据邮箱域名找到对应的 MailSystem"""
domain = email.split("@")[-1] if "@" in email else ""
for ms in self.systems:
if domain in ms.domains:
return ms
return None
@property
def count(self) -> int:
return len(self.systems)
def info(self) -> str:
"""返回所有系统的信息摘要"""
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
for i, ms in enumerate(self.systems, 1):
domains = ", ".join(ms.domains)
lines.append(f" {i}. {ms.base_url} → [{domains}]")
return "\n".join(lines)
def extract_magic_link(html_content):
"""HTML提取出链接"""
if not html_content:
return None
pattern = r'href="(https://claude\.ai/[^"]+)"'
match = re.search(pattern, html_content)
if match:
url = match.group(1)
return url.replace("&", "&")
return None