Files
autoClaude-TGbot/core/mail_service.py

198 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import time
import re
import random
import threading
import requests as standard_requests # 用于普通API交互不走代理直连邮件服务器
class MailSystem:
"""单个邮箱系统实例,支持多域名"""
def __init__(self, base_url, api_token, domains):
self.base_url = base_url
self.domains = domains # 该系统支持的域名列表
self.token = api_token
self.headers = {"Authorization": self.token}
if self.token:
print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...")
else:
print(f"[-] 邮箱系统 Token 为空 ({self.base_url})")
def create_user(self, email_prefix, domain=None):
"""在系统里注册一个新邮箱用户"""
if domain is None:
domain = random.choice(self.domains)
full_email = f"{email_prefix}@{domain}"
url = f"{self.base_url}/api/public/addUser"
payload = {
"list": [
{
"email": full_email,
"password": "random_pass_ignoring_this"
}
]
}
try:
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
elif resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
else:
print(f"[-] 创建邮箱失败: {resp.text}")
return None
except Exception as e:
print(f"[-] 创建邮箱请求异常: {e}")
return None
def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None):
"""像猎人一样耐心等待猎物出现,支持外部中断"""
url = f"{self.base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"sendName": "Anthropic",
"num": 1,
"size": 10,
"timeSort": "desc"
}
print(f"[*] 开始轮询邮件,目标: {to_email}...")
for i in range(retry_count):
# 检查外部中断信号
if stop_check and stop_check():
print("[!] 收到停止信号,中断邮件轮询")
return None
try:
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
data = resp.json()
if resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
if data.get('code') == 200 and data.get('data'):
emails = data['data']
for email in emails:
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
return email.get('content') or email.get('text')
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
time.sleep(sleep_time)
except Exception as e:
print(f"[-] 轮询出错: {e}")
time.sleep(sleep_time)
print("[-] 等待超时,未收到邮件。")
return None
def check_health(self) -> dict:
"""检查该邮箱系统的连通性和 Token 有效性"""
if not self.token:
return {"ok": False, "message": "Token 未配置"}
try:
url = f"{self.base_url}/api/public/emailList"
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10)
if resp.status_code == 200:
return {"ok": True, "message": "连接正常"}
elif resp.status_code in (401, 403):
return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"}
else:
return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"}
except standard_requests.exceptions.ConnectTimeout:
return {"ok": False, "message": "连接超时"}
except standard_requests.exceptions.ConnectionError:
return {"ok": False, "message": "无法连接"}
except Exception as e:
return {"ok": False, "message": f"异常: {e}"}
def __repr__(self):
return f"MailSystem({self.base_url}, domains={self.domains})"
class MailPool:
"""多邮箱系统轮询调度器"""
def __init__(self, mail_configs: list[dict]):
"""
mail_configs: config.MAIL_SYSTEMS 格式的列表
"""
self.systems: list[MailSystem] = []
self._index = 0
self._lock = threading.Lock()
for cfg in mail_configs:
ms = MailSystem(
base_url=cfg["base_url"],
api_token=cfg.get("api_token", ""),
domains=cfg["domains"],
)
if ms.token: # 只添加连接成功的系统
self.systems.append(ms)
else:
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
if not self.systems:
print("[-] 没有可用的邮箱系统!")
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)}")
def next(self) -> MailSystem | None:
"""Round-robin 返回下一个系统"""
if not self.systems:
return None
with self._lock:
ms = self.systems[self._index % len(self.systems)]
self._index += 1
return ms
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
"""
使用下一个系统创建用户。
返回 (email, 对应的 MailSystem) 或 (None, None)。
"""
ms = self.next()
if not ms:
print("[-] 没有可用的邮箱系统")
return None, None
email = ms.create_user(email_prefix)
return email, ms
def get_system_by_domain(self, email: str) -> MailSystem | None:
"""根据邮箱域名找到对应的 MailSystem"""
domain = email.split("@")[-1] if "@" in email else ""
for ms in self.systems:
if domain in ms.domains:
return ms
return None
@property
def count(self) -> int:
return len(self.systems)
def info(self) -> str:
"""返回所有系统的信息摘要"""
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
for i, ms in enumerate(self.systems, 1):
domains = ", ".join(ms.domains)
lines.append(f" {i}. {ms.base_url} → [{domains}]")
return "\n".join(lines)
def extract_magic_link(html_content):
"""HTML提取出链接"""
if not html_content:
return None
pattern = r'href="(https://claude\.ai/[^"]+)"'
match = re.search(pattern, html_content)
if match:
url = match.group(1)
return url.replace("&", "&")
return None