forked from carrydela/autoClaude
198 lines
7.1 KiB
Python
198 lines
7.1 KiB
Python
import time
|
||
import re
|
||
import random
|
||
import threading
|
||
import requests as standard_requests # 用于普通API交互(不走代理,直连邮件服务器)
|
||
|
||
|
||
class MailSystem:
|
||
"""单个邮箱系统实例,支持多域名"""
|
||
|
||
def __init__(self, base_url, api_token, domains):
|
||
self.base_url = base_url
|
||
self.domains = domains # 该系统支持的域名列表
|
||
self.token = api_token
|
||
self.headers = {"Authorization": self.token}
|
||
if self.token:
|
||
print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...")
|
||
else:
|
||
print(f"[-] 邮箱系统 Token 为空 ({self.base_url})")
|
||
|
||
def create_user(self, email_prefix, domain=None):
|
||
"""在系统里注册一个新邮箱用户"""
|
||
if domain is None:
|
||
domain = random.choice(self.domains)
|
||
full_email = f"{email_prefix}@{domain}"
|
||
url = f"{self.base_url}/api/public/addUser"
|
||
payload = {
|
||
"list": [
|
||
{
|
||
"email": full_email,
|
||
"password": "random_pass_ignoring_this"
|
||
}
|
||
]
|
||
}
|
||
try:
|
||
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
|
||
if resp.json().get('code') == 200:
|
||
print(f"[+] 邮箱用户创建成功: {full_email}")
|
||
return full_email
|
||
elif resp.status_code in (401, 403):
|
||
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
|
||
return None
|
||
else:
|
||
print(f"[-] 创建邮箱失败: {resp.text}")
|
||
return None
|
||
except Exception as e:
|
||
print(f"[-] 创建邮箱请求异常: {e}")
|
||
return None
|
||
|
||
def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None):
|
||
"""像猎人一样耐心等待猎物出现,支持外部中断"""
|
||
url = f"{self.base_url}/api/public/emailList"
|
||
payload = {
|
||
"toEmail": to_email,
|
||
"sendName": "Anthropic",
|
||
"num": 1,
|
||
"size": 10,
|
||
"timeSort": "desc"
|
||
}
|
||
|
||
print(f"[*] 开始轮询邮件,目标: {to_email}...")
|
||
|
||
for i in range(retry_count):
|
||
# 检查外部中断信号
|
||
if stop_check and stop_check():
|
||
print("[!] 收到停止信号,中断邮件轮询")
|
||
return None
|
||
|
||
try:
|
||
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
|
||
data = resp.json()
|
||
|
||
if resp.status_code in (401, 403):
|
||
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
|
||
return None
|
||
|
||
if data.get('code') == 200 and data.get('data'):
|
||
emails = data['data']
|
||
for email in emails:
|
||
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
|
||
return email.get('content') or email.get('text')
|
||
|
||
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
|
||
time.sleep(sleep_time)
|
||
except Exception as e:
|
||
print(f"[-] 轮询出错: {e}")
|
||
time.sleep(sleep_time)
|
||
|
||
print("[-] 等待超时,未收到邮件。")
|
||
return None
|
||
|
||
def check_health(self) -> dict:
|
||
"""检查该邮箱系统的连通性和 Token 有效性"""
|
||
if not self.token:
|
||
return {"ok": False, "message": "Token 未配置"}
|
||
try:
|
||
url = f"{self.base_url}/api/public/emailList"
|
||
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
|
||
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10)
|
||
if resp.status_code == 200:
|
||
return {"ok": True, "message": "连接正常"}
|
||
elif resp.status_code in (401, 403):
|
||
return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"}
|
||
else:
|
||
return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"}
|
||
except standard_requests.exceptions.ConnectTimeout:
|
||
return {"ok": False, "message": "连接超时"}
|
||
except standard_requests.exceptions.ConnectionError:
|
||
return {"ok": False, "message": "无法连接"}
|
||
except Exception as e:
|
||
return {"ok": False, "message": f"异常: {e}"}
|
||
|
||
def __repr__(self):
|
||
return f"MailSystem({self.base_url}, domains={self.domains})"
|
||
|
||
|
||
class MailPool:
|
||
"""多邮箱系统轮询调度器"""
|
||
|
||
def __init__(self, mail_configs: list[dict]):
|
||
"""
|
||
mail_configs: config.MAIL_SYSTEMS 格式的列表
|
||
"""
|
||
self.systems: list[MailSystem] = []
|
||
self._index = 0
|
||
self._lock = threading.Lock()
|
||
|
||
for cfg in mail_configs:
|
||
ms = MailSystem(
|
||
base_url=cfg["base_url"],
|
||
api_token=cfg.get("api_token", ""),
|
||
domains=cfg["domains"],
|
||
)
|
||
if ms.token: # 只添加连接成功的系统
|
||
self.systems.append(ms)
|
||
else:
|
||
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
|
||
|
||
if not self.systems:
|
||
print("[-] 没有可用的邮箱系统!")
|
||
|
||
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)} 个")
|
||
|
||
def next(self) -> MailSystem | None:
|
||
"""Round-robin 返回下一个系统"""
|
||
if not self.systems:
|
||
return None
|
||
with self._lock:
|
||
ms = self.systems[self._index % len(self.systems)]
|
||
self._index += 1
|
||
return ms
|
||
|
||
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
|
||
"""
|
||
使用下一个系统创建用户。
|
||
返回 (email, 对应的 MailSystem) 或 (None, None)。
|
||
"""
|
||
ms = self.next()
|
||
if not ms:
|
||
print("[-] 没有可用的邮箱系统")
|
||
return None, None
|
||
email = ms.create_user(email_prefix)
|
||
return email, ms
|
||
|
||
def get_system_by_domain(self, email: str) -> MailSystem | None:
|
||
"""根据邮箱域名找到对应的 MailSystem"""
|
||
domain = email.split("@")[-1] if "@" in email else ""
|
||
for ms in self.systems:
|
||
if domain in ms.domains:
|
||
return ms
|
||
return None
|
||
|
||
@property
|
||
def count(self) -> int:
|
||
return len(self.systems)
|
||
|
||
def info(self) -> str:
|
||
"""返回所有系统的信息摘要"""
|
||
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
|
||
for i, ms in enumerate(self.systems, 1):
|
||
domains = ", ".join(ms.domains)
|
||
lines.append(f" {i}. {ms.base_url} → [{domains}]")
|
||
return "\n".join(lines)
|
||
|
||
|
||
def extract_magic_link(html_content):
|
||
"""HTML提取出链接"""
|
||
if not html_content:
|
||
return None
|
||
|
||
pattern = r'href="(https://claude\.ai/[^"]+)"'
|
||
match = re.search(pattern, html_content)
|
||
|
||
if match:
|
||
url = match.group(1)
|
||
return url.replace("&", "&")
|
||
return None
|