Files
codexTool/email_service.py
kyx236 b50ad199de feat(email_service): Add dynamic email provider configuration support
- Import EMAIL_PROVIDER from config at runtime in unified functions to enable dynamic provider switching
- Update unified_generate_email() to fetch current provider configuration on each call
- Update unified_create_email() to fetch current provider configuration on each call
- Update unified_get_verification_code() to fetch current provider configuration on each call
- Update unified_fetch_emails() to fetch current provider configuration on each call
- Update fallback comments to reflect support for both KYX and Cloud Mail systems
- Allows email provider to be switched without restarting the application
2026-01-30 16:02:00 +08:00

719 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== 邮箱服务模块 ====================
# 处理邮箱创建、验证码获取等功能 (支持多种邮箱系统)
import re
import time
import random
import string
import requests
from typing import Callable, TypeVar, Optional, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import (
EMAIL_API_BASE,
EMAIL_API_AUTH,
EMAIL_ROLE,
DEFAULT_PASSWORD,
REQUEST_TIMEOUT,
VERIFICATION_CODE_INTERVAL,
VERIFICATION_CODE_MAX_RETRIES,
get_random_domain,
EMAIL_PROVIDER,
GPTMAIL_API_BASE,
GPTMAIL_PREFIX,
GPTMAIL_DOMAINS,
get_random_gptmail_domain,
get_next_gptmail_key,
get_gptmail_keys,
get_cloudmail_api_base,
)
from logger import log
def create_session_with_retry():
"""创建带重试机制的 HTTP Session"""
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
# 全局 HTTP Session
http_session = create_session_with_retry()
# ==================== 通用轮询重试工具 ====================
T = TypeVar('T')
class PollResult:
"""轮询结果"""
def __init__(self, success: bool, data: Any = None, error: str = None):
self.success = success
self.data = data
self.error = error
def poll_with_retry(
fetch_func: Callable[[], Optional[T]],
check_func: Callable[[T], Optional[Any]],
max_retries: int = None,
interval: int = None,
fast_retries: int = 5,
fast_interval: int = 1,
description: str = "轮询",
on_progress: Callable[[float], None] = None,
) -> PollResult:
"""通用轮询重试函数
Args:
fetch_func: 获取数据的函数,返回原始数据或 None
check_func: 检查数据的函数,返回提取的结果或 None
max_retries: 最大重试次数
interval: 正常轮询间隔 (秒)
fast_retries: 快速轮询次数 (前 N 次使用快速间隔)
fast_interval: 快速轮询间隔 (秒)
description: 描述信息 (用于日志)
on_progress: 进度回调函数,参数为已用时间 (秒)
Returns:
PollResult: 轮询结果
"""
if max_retries is None:
max_retries = VERIFICATION_CODE_MAX_RETRIES
if interval is None:
interval = VERIFICATION_CODE_INTERVAL
start_time = time.time()
progress_shown = False
for i in range(max_retries):
try:
# 获取数据
data = fetch_func()
if data is not None:
# 检查数据
result = check_func(data)
if result is not None:
if progress_shown:
log.progress_clear()
elapsed = time.time() - start_time
return PollResult(success=True, data=result)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"{description}异常: {e}")
if i < max_retries - 1:
# 动态间隔: 前 fast_retries 次使用快速间隔
wait_time = fast_interval if i < fast_retries else interval
elapsed = time.time() - start_time
if on_progress:
on_progress(elapsed)
else:
log.progress_inline(f"[等待中... {elapsed:.0f}s]")
progress_shown = True
time.sleep(wait_time)
if progress_shown:
log.progress_clear()
elapsed = time.time() - start_time
return PollResult(success=False, error=f"超时 ({elapsed:.0f}s)")
# ==================== GPTMail 临时邮箱服务 ====================
class GPTMailService:
"""GPTMail 临时邮箱服务 (支持多 Key 轮询)"""
def __init__(self, api_base: str = None, api_key: str = None):
self.api_base = api_base or GPTMAIL_API_BASE
# 如果指定了 api_key 则使用指定的,否则使用轮询
self._fixed_key = api_key
def _get_headers(self, api_key: str = None) -> dict:
"""获取请求头 (支持指定 Key 或轮询)"""
key = api_key or self._fixed_key or get_next_gptmail_key()
self._current_key = key # 保存当前使用的 key
return {
"X-API-Key": key,
"Content-Type": "application/json"
}
def _get_key_display(self) -> str:
"""获取当前 key 的脱敏显示"""
key = getattr(self, '_current_key', None)
if not key:
return "未知"
if len(key) > 10:
return f"{key[:4]}...{key[-4:]}"
return key[:4] + "..."
def generate_email(self, prefix: str = None, domain: str = None, api_key: str = None) -> tuple[str, str]:
"""生成临时邮箱地址
Args:
prefix: 邮箱前缀 (可选)
domain: 域名 (可选)
api_key: 指定使用的 API Key (可选,不指定则轮询)
Returns:
tuple: (email, error) - 邮箱地址和错误信息
"""
url = f"{self.api_base}/api/generate-email"
headers = self._get_headers(api_key)
try:
if prefix or domain:
payload = {}
if prefix:
payload["prefix"] = prefix
if domain:
payload["domain"] = domain
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
else:
response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
email = data.get("data", {}).get("email", "")
log.success(f"GPTMail 生成邮箱: {email}")
return email, None
else:
error = data.get("error", "Unknown error")
log.error(f"GPTMail 生成邮箱失败: {error}")
return None, error
except Exception as e:
log.error(f"GPTMail 生成邮箱异常: {e}")
return None, str(e)
def get_emails(self, email: str) -> tuple[list, str]:
"""获取邮箱的邮件列表
Args:
email: 邮箱地址
Returns:
tuple: (emails, error) - 邮件列表和错误信息
"""
url = f"{self.api_base}/api/emails"
params = {"email": email}
headers = self._get_headers()
try:
response = http_session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
emails = data.get("data", {}).get("emails", [])
return emails, None
else:
error = data.get("error", "Unknown error")
return [], error
except Exception as e:
log.warning(f"GPTMail 获取邮件列表异常: {e}")
return [], str(e)
def get_email_detail(self, email_id: str) -> tuple[dict, str]:
"""获取单封邮件详情
Args:
email_id: 邮件ID
Returns:
tuple: (email_detail, error) - 邮件详情和错误信息
"""
url = f"{self.api_base}/api/email/{email_id}"
headers = self._get_headers()
try:
response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
return data.get("data", {}), None
else:
error = data.get("error", "Unknown error")
return {}, error
except Exception as e:
log.warning(f"GPTMail 获取邮件详情异常: {e}")
return {}, str(e)
def delete_email(self, email_id: str) -> tuple[bool, str]:
"""删除单封邮件
Args:
email_id: 邮件ID
Returns:
tuple: (success, error)
"""
url = f"{self.api_base}/api/email/{email_id}"
headers = self._get_headers()
try:
response = http_session.delete(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
return True, None
else:
return False, data.get("error", "Unknown error")
except Exception as e:
return False, str(e)
def clear_inbox(self, email: str) -> tuple[int, str]:
"""清空邮箱
Args:
email: 邮箱地址
Returns:
tuple: (deleted_count, error)
"""
url = f"{self.api_base}/api/emails/clear"
params = {"email": email}
headers = self._get_headers()
try:
response = http_session.delete(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
count = data.get("data", {}).get("count", 0)
return count, None
else:
return 0, data.get("error", "Unknown error")
except Exception as e:
return 0, str(e)
def test_api_key(self, api_key: str) -> tuple[bool, str]:
"""测试 API Key 是否有效
Args:
api_key: 要测试的 API Key
Returns:
tuple: (success, message)
"""
url = f"{self.api_base}/api/generate-email"
headers = {
"X-API-Key": api_key,
"Content-Type": "application/json"
}
try:
response = http_session.get(url, headers=headers, timeout=10)
data = response.json()
if data.get("success"):
email = data.get("data", {}).get("email", "")
return True, f"Key 有效,测试邮箱: {email}"
else:
error = data.get("error", "Unknown error")
return False, f"Key 无效: {error}"
except Exception as e:
return False, f"测试失败: {e}"
def get_verification_code(self, email: str, max_retries: int = None, interval: int = None) -> tuple[str, str, str]:
"""从邮箱获取验证码 (使用通用轮询重试)
Args:
email: 邮箱地址
max_retries: 最大重试次数
interval: 基础轮询间隔 (秒)
Returns:
tuple: (code, error, email_time) - 验证码、错误信息、邮件时间
"""
log.info(f"GPTMail 等待验证码: {email} (Key: {self._get_key_display()})", icon="email")
# 用于存储邮件时间的闭包变量
email_time_holder = [None]
def fetch_emails():
"""获取邮件列表"""
emails, error = self.get_emails(email)
return emails if emails else None
def check_for_code(emails):
"""检查邮件中是否有验证码"""
for email_item in emails:
subject = email_item.get("subject", "")
content = email_item.get("content", "")
email_time_holder[0] = email_item.get("created_at", "")
# 尝试从主题中提取验证码
code = self._extract_code(subject)
if code:
return code
# 尝试从内容中提取验证码
code = self._extract_code(content)
if code:
return code
return None
# 使用通用轮询函数
result = poll_with_retry(
fetch_func=fetch_emails,
check_func=check_for_code,
max_retries=max_retries,
interval=interval,
description="GPTMail 获取邮件"
)
if result.success:
log.success(f"GPTMail 验证码获取成功: {result.data}")
return result.data, None, email_time_holder[0]
else:
log.error(f"GPTMail 验证码获取失败 ({result.error})")
return None, "未能获取验证码", None
def _extract_code(self, text: str) -> str:
"""从文本中提取验证码"""
if not text:
return None
# 尝试多种模式
patterns = [
r"代码为\s*(\d{6})",
r"code is\s*(\d{6})",
r"verification code[:\s]*(\d{6})",
r"验证码[:\s]*(\d{6})",
r"(\d{6})", # 最后尝试直接匹配6位数字
]
for pattern in patterns:
match = re.search(pattern, text, re.IGNORECASE)
if match:
return match.group(1)
return None
# 全局 GPTMail 服务实例
gptmail_service = GPTMailService()
# ==================== 原有 KYX 邮箱服务 ====================
def generate_random_email() -> str:
"""生成随机邮箱地址: {random_str}oaiteam@{random_domain}"""
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
domain = get_random_domain()
email = f"{random_str}oaiteam@{domain}"
log.success(f"生成邮箱: {email}")
return email
def create_email_user(email: str, password: str = None, role_name: str = None) -> tuple[bool, str]:
"""在邮箱平台创建用户 (与 main.py 一致)
Args:
email: 邮箱地址
password: 密码,默认使用 DEFAULT_PASSWORD
role_name: 角色名,默认使用 EMAIL_ROLE
Returns:
tuple: (success, message)
"""
if password is None:
password = DEFAULT_PASSWORD
if role_name is None:
role_name = EMAIL_ROLE
url = f"{get_cloudmail_api_base()}/addUser"
headers = {
"Authorization": EMAIL_API_AUTH,
"Content-Type": "application/json"
}
payload = {
"list": [{"email": email, "password": password, "roleName": role_name}]
}
try:
log.info(f"创建邮箱用户: {email}", icon="email")
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
data = response.json()
success = data.get("code") == 200
msg = data.get("message", "Unknown error")
if success:
log.success("邮箱创建成功")
else:
log.warning(f"邮箱创建失败: {msg}")
return success, msg
except Exception as e:
log.error(f"邮箱创建异常: {e}")
return False, str(e)
def get_verification_code(email: str, max_retries: int = None, interval: int = None) -> tuple[str, str, str]:
"""从邮箱获取验证码 (使用通用轮询重试)
Args:
email: 邮箱地址
max_retries: 最大重试次数
interval: 基础轮询间隔 (秒)
Returns:
tuple: (code, error, email_time) - 验证码、错误信息、邮件时间
"""
url = f"{get_cloudmail_api_base()}/emailList"
headers = {
"Authorization": EMAIL_API_AUTH,
"Content-Type": "application/json"
}
payload = {"toEmail": email}
log.info(f"等待验证码邮件: {email}", icon="email")
# 记录初始邮件数量,用于检测新邮件
initial_email_count = 0
try:
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("code") == 200:
initial_email_count = len(data.get("data", []))
except Exception:
pass
# 用于存储邮件时间的闭包变量
email_time_holder = [None]
def fetch_emails():
"""获取邮件列表"""
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("code") == 200:
emails = data.get("data", [])
# 只返回有新邮件时的数据
if emails and len(emails) > initial_email_count:
return emails
return None
def extract_code_from_subject(subject: str) -> str:
"""从主题中提取验证码"""
patterns = [
r"代码为\s*(\d{6})",
r"code is\s*(\d{6})",
r"(\d{6})",
]
for pattern in patterns:
match = re.search(pattern, subject, re.IGNORECASE)
if match:
return match.group(1)
return None
def check_for_code(emails):
"""检查邮件中是否有验证码"""
latest_email = emails[0]
subject = latest_email.get("subject", "")
email_time_holder[0] = latest_email.get("createTime", "")
code = extract_code_from_subject(subject)
return code
# 使用通用轮询函数
result = poll_with_retry(
fetch_func=fetch_emails,
check_func=check_for_code,
max_retries=max_retries,
interval=interval,
description="获取邮件"
)
if result.success:
log.success(f"验证码获取成功: {result.data}")
return result.data, None, email_time_holder[0]
else:
log.error(f"验证码获取失败 ({result.error})")
return None, "未能获取验证码", None
def fetch_email_content(email: str) -> list:
"""获取邮箱中的邮件列表
Args:
email: 邮箱地址
Returns:
list: 邮件列表
"""
url = f"{get_cloudmail_api_base()}/emailList"
headers = {
"Authorization": EMAIL_API_AUTH,
"Content-Type": "application/json"
}
payload = {"toEmail": email}
try:
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("code") == 200:
return data.get("data", [])
except Exception as e:
log.warning(f"获取邮件列表异常: {e}")
return []
def batch_create_emails(count: int = 4, max_workers: int = 4) -> list:
"""批量创建邮箱 (并行版本,根据 EMAIL_PROVIDER 配置自动选择邮箱系统)
Args:
count: 创建数量
max_workers: 最大并行线程数,默认 4
Returns:
list: [{"email": "...", "password": "..."}, ...]
"""
accounts = []
failed_count = 0
# 使用线程池并行创建邮箱
with ThreadPoolExecutor(max_workers=min(count, max_workers)) as executor:
# 提交所有创建任务
futures = {executor.submit(unified_create_email): i for i in range(count)}
# 收集结果
for future in as_completed(futures):
task_idx = futures[future]
try:
email, password = future.result()
if email:
accounts.append({
"email": email,
"password": password
})
else:
failed_count += 1
log.warning(f"邮箱创建失败 (任务 {task_idx + 1})")
except Exception as e:
failed_count += 1
log.warning(f"邮箱创建异常 (任务 {task_idx + 1}): {e}")
log.info(f"邮箱创建完成: {len(accounts)}/{count}" + (f" (失败 {failed_count})" if failed_count else ""), icon="email")
return accounts
# ==================== 统一邮箱接口 (根据配置自动选择) ====================
def unified_generate_email() -> str:
"""统一生成邮箱地址接口 (根据 EMAIL_PROVIDER 配置自动选择)
Returns:
str: 邮箱地址
"""
# 每次调用时重新获取配置,支持动态切换
from config import EMAIL_PROVIDER as current_provider
if current_provider == "gptmail":
# 生成随机前缀 + oaiteam 后缀,确保不重复
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
prefix = f"{random_str}-oaiteam"
domain = get_random_gptmail_domain() or None
email, error = gptmail_service.generate_email(prefix=prefix, domain=domain)
if email:
return email
log.warning(f"GPTMail 生成失败,回退到 KYX: {error}")
# 默认使用 KYX / Cloud Mail 系统
return generate_random_email()
def unified_create_email() -> tuple[str, str]:
"""统一创建邮箱接口 (根据 EMAIL_PROVIDER 配置自动选择)
Returns:
tuple: (email, password)
"""
# 每次调用时重新获取配置,支持动态切换
from config import EMAIL_PROVIDER as current_provider
if current_provider == "gptmail":
# 生成随机前缀 + oaiteam 后缀,确保不重复
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=8))
prefix = f"{random_str}-oaiteam"
domain = get_random_gptmail_domain() or None
email, error = gptmail_service.generate_email(prefix=prefix, domain=domain)
if email:
# GPTMail 不需要密码,但为了接口一致性返回默认密码
return email, DEFAULT_PASSWORD
log.warning(f"GPTMail 生成失败,回退到 KYX: {error}")
# 默认使用 KYX / Cloud Mail 系统
email = generate_random_email()
success, msg = create_email_user(email, DEFAULT_PASSWORD)
if success or "已存在" in msg:
return email, DEFAULT_PASSWORD
return None, None
def unified_get_verification_code(email: str, max_retries: int = None, interval: int = None) -> tuple[str, str, str]:
"""统一获取验证码接口 (根据 EMAIL_PROVIDER 配置自动选择)
Args:
email: 邮箱地址
max_retries: 最大重试次数
interval: 轮询间隔 (秒)
Returns:
tuple: (code, error, email_time) - 验证码、错误信息、邮件时间
"""
# 每次调用时重新获取配置,支持动态切换
from config import EMAIL_PROVIDER as current_provider
if current_provider == "gptmail":
return gptmail_service.get_verification_code(email, max_retries, interval)
# 默认使用 KYX / Cloud Mail 系统
return get_verification_code(email, max_retries, interval)
def unified_fetch_emails(email: str) -> list:
"""统一获取邮件列表接口 (根据 EMAIL_PROVIDER 配置自动选择)
Args:
email: 邮箱地址
Returns:
list: 邮件列表
"""
# 每次调用时重新获取配置,支持动态切换
from config import EMAIL_PROVIDER as current_provider
if current_provider == "gptmail":
emails, error = gptmail_service.get_emails(email)
return emails
# 默认使用 KYX / Cloud Mail 系统
return fetch_email_content(email)