Files
AutoDoneTeam/modules/tempmail.py
dela f7f45dbeff feat: 添加批量注册和多域名支持
新功能:
- 新增批量自动注册脚本 auto_register.py
- 新增临时邮箱客户端 tempmail.py
- 支持选择多个邮箱域名后缀(domain_index)
- 自动生成临时邮箱并完成注册流程
- 成功注册的邮箱保留,失败的自动删除

配置改进:
- 创建 config.example.py 模板
- config.py 加入 .gitignore 保护敏感信息
- 新增 domain_index 配置项

文档更新:
- 更新 README.md,添加多域名配置说明
- 添加配置文件安全说明
- 完善快速开始指南

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2026-01-10 18:55:03 +08:00

451 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""临时邮箱客户端 - 用于自动接收验证邮件"""
import time
import re
import requests
from typing import Optional, Dict, List
from config import DEBUG
class TempMailClient:
"""临时邮箱 API 客户端
使用文档中的 API 接口实现:
- 生成临时邮箱
- 轮询接收邮件
- 提取验证码/验证链接
"""
def __init__(self, api_base_url: str, username: str = None, password: str = None, admin_token: str = None):
"""
Args:
api_base_url: API 基础 URL例如: https://your.domain
username: 登录用户名方式1用户登录
password: 登录密码方式1用户登录
admin_token: JWT_TOKEN方式2根管理员令牌
Note:
优先使用 username + password 登录方式
如果未提供 username/password则使用 admin_token
"""
self.base_url = api_base_url.rstrip('/')
self.session = requests.Session()
self.admin_token = None
self.use_token_auth = False
# 方式1用户名密码登录
if username and password:
if DEBUG:
print(f"[TempMail] Logging in as: {username}")
self._login(username, password)
# 方式2使用管理员令牌Root Admin Override
elif admin_token:
if DEBUG:
print(f"[TempMail] Using admin token authentication")
self.admin_token = admin_token
self.use_token_auth = True
else:
raise Exception("Must provide either (username, password) or admin_token")
def _login(self, username: str, password: str):
"""用户登录获取会话 Cookie
Args:
username: 用户名
password: 密码
Raises:
Exception: 登录失败
"""
url = f"{self.base_url}/api/login"
try:
resp = self.session.post(
url,
json={
'username': username,
'password': password
},
headers={
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0',
'Accept': 'application/json',
'Content-Type': 'application/json',
},
timeout=10
)
if resp.status_code != 200:
raise Exception(f"Login failed: {resp.status_code} {resp.text}")
data = resp.json()
if not data.get('success'):
raise Exception(f"Login failed: {data}")
if DEBUG:
print(f"[TempMail] ✓ Logged in successfully")
print(f"[TempMail] Role: {data.get('role')}")
print(f"[TempMail] Mailbox limit: {data.get('mailbox_limit')}")
except Exception as e:
if DEBUG:
print(f"[TempMail] ✗ Login failed: {e}")
raise
def _get_headers(self) -> Dict[str, str]:
"""生成请求头
如果使用 Token 认证,添加 Authorization 头
如果使用 Cookie 认证headers 不需要 AuthorizationCookie 会自动发送)
"""
headers = {
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0',
'Accept': 'application/json',
}
# 如果使用 Token 认证,添加 Authorization 头
if self.use_token_auth and self.admin_token:
headers['Authorization'] = f'Bearer {self.admin_token}'
return headers
def generate_mailbox(self, length: int = 10, domain_index: int = 0) -> str:
"""生成新的临时邮箱
Args:
length: 邮箱用户名长度(默认 10
domain_index: 域名索引(默认 0
Returns:
邮箱地址(例如: random@domain.com
Raises:
Exception: 如果生成失败
"""
if DEBUG:
print(f"\n[TempMail] Generating new mailbox...")
url = f"{self.base_url}/api/generate"
params = {
'length': length,
'domainIndex': domain_index
}
try:
resp = self.session.get(url, params=params, headers=self._get_headers(), timeout=10)
if resp.status_code != 200:
raise Exception(f"Failed to generate mailbox: {resp.status_code} {resp.text}")
data = resp.json()
email = data.get('email')
expires = data.get('expires')
if not email:
raise Exception(f"No email in response: {data}")
if DEBUG:
print(f"[TempMail] ✓ Generated: {email}")
if expires:
print(f"[TempMail] Expires: {time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(expires))}")
return email
except Exception as e:
if DEBUG:
print(f"[TempMail] ✗ Failed to generate mailbox: {e}")
raise
def get_emails(self, mailbox: str) -> List[Dict]:
"""获取邮箱的邮件列表
Args:
mailbox: 邮箱地址
Returns:
邮件列表(包含 id/from/subject/time 等)
"""
url = f"{self.base_url}/api/emails"
params = {'mailbox': mailbox}
try:
resp = self.session.get(url, params=params, headers=self._get_headers(), timeout=10)
if resp.status_code != 200:
if DEBUG:
print(f"[TempMail] Failed to get emails: {resp.status_code}")
return []
emails = resp.json()
return emails if isinstance(emails, list) else []
except Exception as e:
if DEBUG:
print(f"[TempMail] Exception while getting emails: {e}")
return []
def get_email_detail(self, email_id: str) -> Dict:
"""获取邮件详情(包括 HTML 和纯文本内容)
Args:
email_id: 邮件 ID
Returns:
邮件详情(包含 html/text/subject/from 等)
"""
url = f"{self.base_url}/api/email/{email_id}"
try:
resp = self.session.get(url, headers=self._get_headers(), timeout=10)
if resp.status_code != 200:
raise Exception(f"Failed to get email detail: {resp.status_code}")
return resp.json()
except Exception as e:
if DEBUG:
print(f"[TempMail] Failed to get email detail: {e}")
raise
def wait_for_email(
self,
mailbox: str,
from_filter: Optional[str] = None,
subject_filter: Optional[str] = None,
timeout: int = 120,
interval: int = 5
) -> Optional[Dict]:
"""轮询等待邮件到达
Args:
mailbox: 邮箱地址
from_filter: 发件人过滤(例如: "openai.com" 匹配 "*@openai.com"
subject_filter: 主题过滤(例如: "Verify" 匹配包含 "Verify" 的主题)
timeout: 超时时间(秒)
interval: 轮询间隔(秒)
Returns:
邮件详情(包含完整内容),如果超时返回 None
"""
if DEBUG:
print(f"\n[TempMail] Waiting for email...")
print(f"[TempMail] Mailbox: {mailbox}")
if from_filter:
print(f"[TempMail] From filter: *{from_filter}")
if subject_filter:
print(f"[TempMail] Subject filter: *{subject_filter}*")
print(f"[TempMail] Timeout: {timeout}s, Interval: {interval}s")
start_time = time.time()
attempts = 0
while time.time() - start_time < timeout:
attempts += 1
# 获取邮件列表
emails = self.get_emails(mailbox)
if DEBUG and attempts == 1:
print(f"[TempMail] Polling... (attempt {attempts}, {len(emails)} emails)")
elif DEBUG and attempts % 3 == 0: # 每 3 次打印一次
elapsed = int(time.time() - start_time)
print(f"[TempMail] Still waiting... ({elapsed}s elapsed, {len(emails)} emails)")
# 过滤邮件
for email in emails:
# 调试:打印所有邮件信息
if DEBUG and attempts <= 2:
print(f"[TempMail] Checking email:")
print(f"[TempMail] From: {email.get('from')}")
print(f"[TempMail] Subject: {email.get('subject')}")
# 发件人过滤
if from_filter:
email_from = email.get('from', '').lower()
if from_filter.lower() not in email_from:
if DEBUG and attempts <= 2:
print(f"[TempMail] ✗ From filter mismatch (expected *{from_filter})")
continue
# 主题过滤
if subject_filter:
subject = email.get('subject', '').lower()
if subject_filter.lower() not in subject:
if DEBUG and attempts <= 2:
print(f"[TempMail] ✗ Subject filter mismatch (expected *{subject_filter}*)")
continue
# 找到匹配的邮件,获取详情
if DEBUG:
print(f"[TempMail] ✓ Found matching email:")
print(f"[TempMail] From: {email.get('from')}")
print(f"[TempMail] Subject: {email.get('subject')}")
email_detail = self.get_email_detail(email['id'])
return email_detail
# 等待下一次轮询
time.sleep(interval)
# 超时
if DEBUG:
print(f"[TempMail] ✗ Timeout after {timeout}s")
return None
def extract_verification_code(self, email_data: Dict) -> Optional[str]:
"""从邮件中提取验证码
支持的格式:
- 6 位数字验证码(例如: "123456"
- "Your code is 123456"
- "Verification code: 123456"
Args:
email_data: 邮件详情(包含 html_content/content/text/html
Returns:
验证码字符串,如果未找到返回 None
"""
if DEBUG:
print(f"\n[TempMail] Extracting verification code...")
# 兼容不同的字段名
text_content = email_data.get('text', '') or email_data.get('content', '')
html_content = email_data.get('html', '') or email_data.get('html_content', '')
# 合并内容
combined = f"{text_content}\n{html_content}"
# 常见的验证码模式
patterns = [
r'\b(\d{6})\b', # 独立的 6 位数字
r'code[:\s]+(\d{6})', # "code: 123456"
r'verification[:\s]+(\d{6})', # "verification: 123456"
r'otp[:\s]+(\d{6})', # "otp: 123456"
r'is[:\s]+(\d{6})', # "Your code is 123456"
]
for pattern in patterns:
match = re.search(pattern, combined, re.IGNORECASE)
if match:
code = match.group(1)
if DEBUG:
print(f"[TempMail] ✓ Found code: {code}")
return code
if DEBUG:
print(f"[TempMail] ✗ No verification code found")
print(f"[TempMail] Text preview: {text_content[:200]}")
print(f"[TempMail] HTML preview: {html_content[:200]}")
return None
def extract_verification_link(self, email_data: Dict) -> Optional[str]:
"""从邮件中提取验证链接
支持的格式:
- https://auth.openai.com/verify?token=...
- https://chatgpt.com/verify?code=...
Args:
email_data: 邮件详情(包含 html/text
Returns:
验证链接,如果未找到返回 None
"""
if DEBUG:
print(f"\n[TempMail] Extracting verification link...")
text_content = email_data.get('text', '')
html_content = email_data.get('html', '')
combined = f"{text_content}\n{html_content}"
# 提取 OpenAI/ChatGPT 的验证链接
patterns = [
r'https?://auth\.openai\.com/[^\s<>"]+',
r'https?://chatgpt\.com/[^\s<>"]+verify[^\s<>"]*',
]
for pattern in patterns:
match = re.search(pattern, combined, re.IGNORECASE)
if match:
link = match.group(0)
if DEBUG:
print(f"[TempMail] ✓ Found link: {link}")
return link
if DEBUG:
print(f"[TempMail] ✗ No verification link found")
return None
def delete_mailbox(self, mailbox: str) -> bool:
"""删除邮箱(失败时清理)
Args:
mailbox: 邮箱地址
Returns:
是否成功
"""
if DEBUG:
print(f"\n[TempMail] Deleting mailbox: {mailbox}")
# 使用 query 参数而不是路径参数
url = f"{self.base_url}/api/mailboxes"
params = {'address': mailbox}
try:
resp = self.session.delete(url, params=params, headers=self._get_headers(), timeout=10)
if resp.status_code in [200, 204]:
if DEBUG:
print(f"[TempMail] ✓ Mailbox deleted")
return True
else:
if DEBUG:
print(f"[TempMail] ✗ Failed to delete mailbox: {resp.status_code}")
return False
except Exception as e:
if DEBUG:
print(f"[TempMail] ✗ Exception during deletion: {e}")
return False
def cleanup_mailbox(self, mailbox: str) -> bool:
"""清空邮箱所有邮件(不删除邮箱本身)
Args:
mailbox: 邮箱地址
Returns:
是否成功
"""
if DEBUG:
print(f"\n[TempMail] Cleaning up mailbox emails: {mailbox}")
url = f"{self.base_url}/api/emails"
params = {'mailbox': mailbox}
try:
resp = self.session.delete(url, params=params, headers=self._get_headers(), timeout=10)
if resp.status_code == 200:
data = resp.json()
deleted_count = data.get('deletedCount', 0)
if DEBUG:
print(f"[TempMail] ✓ Deleted {deleted_count} emails")
return True
else:
if DEBUG:
print(f"[TempMail] ✗ Failed to cleanup: {resp.status_code}")
return False
except Exception as e:
if DEBUG:
print(f"[TempMail] ✗ Exception during cleanup: {e}")
return False