Files
autoPlus/test_cloudmail_standalone.py
2026-01-26 15:04:02 +08:00

184 lines
5.2 KiB
Python

#!/usr/bin/env python3
"""
Cloud Mail API 独立测试脚本
使用方法:
1. 配置 .env 文件中的 Cloud Mail 参数
2. 运行: python test_cloudmail_standalone.py
"""
import asyncio
import time
from config import load_config
from utils.mail_box import CloudMailHandler
from utils.logger import logger
async def test_email_query(handler: CloudMailHandler, test_email: str):
"""测试邮件查询功能"""
logger.info("=" * 60)
logger.info("测试 1: 查询最近的邮件")
logger.info("=" * 60)
try:
emails = await handler._query_emails(
to_email=test_email,
time_sort="desc",
size=5
)
logger.success(f"✓ 查询到 {len(emails)} 封邮件")
if emails:
for i, email in enumerate(emails, 1):
logger.info(
f" 邮件 {i}:\n"
f" 发件人: {email.get('sendEmail')}\n"
f" 主题: {email.get('subject')}\n"
f" 时间: {email.get('createTime')}"
)
else:
logger.warning(" (邮箱为空)")
return len(emails) > 0
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def test_otp_waiting(handler: CloudMailHandler, test_email: str):
"""测试 OTP 等待功能"""
logger.info("")
logger.info("=" * 60)
logger.info("测试 2: OTP 等待功能")
logger.info("=" * 60)
logger.warning(f"请在 60 秒内向 {test_email} 发送测试 OTP 邮件")
logger.warning(f"发件人应为: {handler.OTP_SENDER}")
try:
otp = await handler.wait_for_otp(test_email, timeout=60)
logger.success(f"✓ OTP 接收成功: {otp}")
return True
except TimeoutError:
logger.error("✗ 超时未收到 OTP")
return False
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def test_add_user(handler: CloudMailHandler):
"""测试添加用户功能"""
logger.info("")
logger.info("=" * 60)
logger.info("测试 3: 添加测试用户")
logger.info("=" * 60)
# 生成测试邮箱
test_users = [
{"email": f"test_{int(time.time())}@example.com"}
]
try:
result = await handler.add_users(test_users)
logger.success(f"✓ 用户创建请求已发送")
logger.info(f" 响应: {result}")
return True
except Exception as e:
logger.error(f"✗ 测试失败: {e}")
return False
async def main():
"""主测试流程"""
logger.info("=" * 60)
logger.info("Cloud Mail API 测试开始")
logger.info("=" * 60)
# 加载配置
try:
config = load_config()
except Exception as e:
logger.error(f"配置加载失败: {e}")
return
# 验证配置
if not config.mail.enabled or config.mail.type != "cloudmail":
logger.error("")
logger.error("请在 .env 中配置 Cloud Mail 参数:")
logger.error(" MAIL_ENABLED=true")
logger.error(" MAIL_TYPE=cloudmail")
logger.error(" MAIL_CLOUDMAIL_API_URL=https://your-domain.com")
logger.error(" MAIL_CLOUDMAIL_TOKEN=your_token")
return
# 初始化 handler
try:
handler = CloudMailHandler(config.mail.to_dict())
except Exception as e:
logger.error(f"CloudMailHandler 初始化失败: {e}")
return
# 获取测试邮箱
logger.info("")
test_email = input("请输入测试邮箱地址: ").strip()
if not test_email:
logger.error("未输入邮箱地址,退出测试")
return
# 运行测试
results = {}
try:
# 测试 1: 邮件查询
results["email_query"] = await test_email_query(handler, test_email)
# 测试 2: OTP 等待(可选)
if input("\n是否测试 OTP 等待功能? (y/N): ").lower() == 'y':
results["otp_waiting"] = await test_otp_waiting(handler, test_email)
# 测试 3: 添加用户(可选)
if input("\n是否测试添加用户功能? (y/N): ").lower() == 'y':
results["add_user"] = await test_add_user(handler)
finally:
# 清理资源
await handler.close()
# 测试总结
logger.info("")
logger.info("=" * 60)
logger.info("测试结果总结")
logger.info("=" * 60)
if results:
for name, passed in results.items():
status = "✓ 通过" if passed else "✗ 失败"
logger.info(f" {name}: {status}")
# 总体结果
total = len(results)
passed = sum(1 for v in results.values() if v)
logger.info("")
logger.info(f"总计: {passed}/{total} 测试通过")
if passed == total:
logger.success("所有测试通过!✅")
else:
logger.warning(f"部分测试失败 ({total - passed} 个)")
else:
logger.warning("未执行任何测试")
logger.info("=" * 60)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.warning("\n测试被用户中断")
except Exception as e:
logger.exception(f"测试过程中发生未捕获的异常: {e}")