forked from carrydela/autoClaude
200 lines
7.1 KiB
Python
200 lines
7.1 KiB
Python
import time
|
|
import re
|
|
import random
|
|
import threading
|
|
import requests as standard_requests # 用于普通API交互
|
|
|
|
from config import get_proxy
|
|
|
|
|
|
class MailSystem:
|
|
"""单个邮箱系统实例,支持多域名"""
|
|
|
|
def __init__(self, base_url, api_token, domains):
|
|
self.base_url = base_url
|
|
self.domains = domains # 该系统支持的域名列表
|
|
self.token = api_token
|
|
self.headers = {"Authorization": self.token}
|
|
if self.token:
|
|
print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...")
|
|
else:
|
|
print(f"[-] 邮箱系统 Token 为空 ({self.base_url})")
|
|
|
|
def create_user(self, email_prefix, domain=None):
|
|
"""在系统里注册一个新邮箱用户"""
|
|
if domain is None:
|
|
domain = random.choice(self.domains)
|
|
full_email = f"{email_prefix}@{domain}"
|
|
url = f"{self.base_url}/api/public/addUser"
|
|
payload = {
|
|
"list": [
|
|
{
|
|
"email": full_email,
|
|
"password": "random_pass_ignoring_this"
|
|
}
|
|
]
|
|
}
|
|
try:
|
|
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15)
|
|
if resp.json().get('code') == 200:
|
|
print(f"[+] 邮箱用户创建成功: {full_email}")
|
|
return full_email
|
|
elif resp.status_code in (401, 403):
|
|
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
|
|
return None
|
|
else:
|
|
print(f"[-] 创建邮箱失败: {resp.text}")
|
|
return None
|
|
except Exception as e:
|
|
print(f"[-] 创建邮箱请求异常: {e}")
|
|
return None
|
|
|
|
def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None):
|
|
"""像猎人一样耐心等待猎物出现,支持外部中断"""
|
|
url = f"{self.base_url}/api/public/emailList"
|
|
payload = {
|
|
"toEmail": to_email,
|
|
"sendName": "Anthropic",
|
|
"num": 1,
|
|
"size": 10,
|
|
"timeSort": "desc"
|
|
}
|
|
|
|
print(f"[*] 开始轮询邮件,目标: {to_email}...")
|
|
|
|
for i in range(retry_count):
|
|
# 检查外部中断信号
|
|
if stop_check and stop_check():
|
|
print("[!] 收到停止信号,中断邮件轮询")
|
|
return None
|
|
|
|
try:
|
|
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15)
|
|
data = resp.json()
|
|
|
|
if resp.status_code in (401, 403):
|
|
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
|
|
return None
|
|
|
|
if data.get('code') == 200 and data.get('data'):
|
|
emails = data['data']
|
|
for email in emails:
|
|
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
|
|
return email.get('content') or email.get('text')
|
|
|
|
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
|
|
time.sleep(sleep_time)
|
|
except Exception as e:
|
|
print(f"[-] 轮询出错: {e}")
|
|
time.sleep(sleep_time)
|
|
|
|
print("[-] 等待超时,未收到邮件。")
|
|
return None
|
|
|
|
def check_health(self) -> dict:
|
|
"""检查该邮箱系统的连通性和 Token 有效性"""
|
|
if not self.token:
|
|
return {"ok": False, "message": "Token 未配置"}
|
|
try:
|
|
url = f"{self.base_url}/api/public/emailList"
|
|
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
|
|
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=10)
|
|
if resp.status_code == 200:
|
|
return {"ok": True, "message": "连接正常"}
|
|
elif resp.status_code in (401, 403):
|
|
return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"}
|
|
else:
|
|
return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"}
|
|
except standard_requests.exceptions.ConnectTimeout:
|
|
return {"ok": False, "message": "连接超时"}
|
|
except standard_requests.exceptions.ConnectionError:
|
|
return {"ok": False, "message": "无法连接"}
|
|
except Exception as e:
|
|
return {"ok": False, "message": f"异常: {e}"}
|
|
|
|
def __repr__(self):
|
|
return f"MailSystem({self.base_url}, domains={self.domains})"
|
|
|
|
|
|
class MailPool:
|
|
"""多邮箱系统轮询调度器"""
|
|
|
|
def __init__(self, mail_configs: list[dict]):
|
|
"""
|
|
mail_configs: config.MAIL_SYSTEMS 格式的列表
|
|
"""
|
|
self.systems: list[MailSystem] = []
|
|
self._index = 0
|
|
self._lock = threading.Lock()
|
|
|
|
for cfg in mail_configs:
|
|
ms = MailSystem(
|
|
base_url=cfg["base_url"],
|
|
api_token=cfg.get("api_token", ""),
|
|
domains=cfg["domains"],
|
|
)
|
|
if ms.token: # 只添加连接成功的系统
|
|
self.systems.append(ms)
|
|
else:
|
|
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
|
|
|
|
if not self.systems:
|
|
print("[-] 没有可用的邮箱系统!")
|
|
|
|
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)} 个")
|
|
|
|
def next(self) -> MailSystem | None:
|
|
"""Round-robin 返回下一个系统"""
|
|
if not self.systems:
|
|
return None
|
|
with self._lock:
|
|
ms = self.systems[self._index % len(self.systems)]
|
|
self._index += 1
|
|
return ms
|
|
|
|
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
|
|
"""
|
|
使用下一个系统创建用户。
|
|
返回 (email, 对应的 MailSystem) 或 (None, None)。
|
|
"""
|
|
ms = self.next()
|
|
if not ms:
|
|
print("[-] 没有可用的邮箱系统")
|
|
return None, None
|
|
email = ms.create_user(email_prefix)
|
|
return email, ms
|
|
|
|
def get_system_by_domain(self, email: str) -> MailSystem | None:
|
|
"""根据邮箱域名找到对应的 MailSystem"""
|
|
domain = email.split("@")[-1] if "@" in email else ""
|
|
for ms in self.systems:
|
|
if domain in ms.domains:
|
|
return ms
|
|
return None
|
|
|
|
@property
|
|
def count(self) -> int:
|
|
return len(self.systems)
|
|
|
|
def info(self) -> str:
|
|
"""返回所有系统的信息摘要"""
|
|
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
|
|
for i, ms in enumerate(self.systems, 1):
|
|
domains = ", ".join(ms.domains)
|
|
lines.append(f" {i}. {ms.base_url} → [{domains}]")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def extract_magic_link(html_content):
|
|
"""HTML提取出链接"""
|
|
if not html_content:
|
|
return None
|
|
|
|
pattern = r'href="(https://claude\.ai/[^"]+)"'
|
|
match = re.search(pattern, html_content)
|
|
|
|
if match:
|
|
url = match.group(1)
|
|
return url.replace("&", "&")
|
|
return None
|