#!/usr/bin/env python3 """ Cloud Mail API 独立测试脚本 使用方法: 1. 配置 .env 文件中的 Cloud Mail 参数 2. 运行: python test_cloudmail_standalone.py """ import asyncio import time from config import load_config from utils.mail_box import CloudMailHandler from utils.logger import logger async def test_email_query(handler: CloudMailHandler, test_email: str): """测试邮件查询功能""" logger.info("=" * 60) logger.info("测试 1: 查询最近的邮件") logger.info("=" * 60) try: emails = await handler._query_emails( to_email=test_email, time_sort="desc", size=5 ) logger.success(f"✓ 查询到 {len(emails)} 封邮件") if emails: for i, email in enumerate(emails, 1): logger.info( f" 邮件 {i}:\n" f" 发件人: {email.get('sendEmail')}\n" f" 主题: {email.get('subject')}\n" f" 时间: {email.get('createTime')}" ) else: logger.warning(" (邮箱为空)") return len(emails) > 0 except Exception as e: logger.error(f"✗ 测试失败: {e}") return False async def test_otp_waiting(handler: CloudMailHandler, test_email: str): """测试 OTP 等待功能""" logger.info("") logger.info("=" * 60) logger.info("测试 2: OTP 等待功能") logger.info("=" * 60) logger.warning(f"请在 60 秒内向 {test_email} 发送测试 OTP 邮件") logger.warning(f"发件人应为: {handler.OTP_SENDER}") try: otp = await handler.wait_for_otp(test_email, timeout=60) logger.success(f"✓ OTP 接收成功: {otp}") return True except TimeoutError: logger.error("✗ 超时未收到 OTP") return False except Exception as e: logger.error(f"✗ 测试失败: {e}") return False async def test_add_user(handler: CloudMailHandler): """测试添加用户功能""" logger.info("") logger.info("=" * 60) logger.info("测试 3: 添加测试用户") logger.info("=" * 60) # 生成测试邮箱 test_users = [ {"email": f"test_{int(time.time())}@example.com"} ] try: result = await handler.add_users(test_users) logger.success(f"✓ 用户创建请求已发送") logger.info(f" 响应: {result}") return True except Exception as e: logger.error(f"✗ 测试失败: {e}") return False async def main(): """主测试流程""" logger.info("=" * 60) logger.info("Cloud Mail API 测试开始") logger.info("=" * 60) # 加载配置 try: config = load_config() except Exception as e: logger.error(f"配置加载失败: {e}") return # 验证配置 if not config.mail.enabled or config.mail.type != "cloudmail": logger.error("") logger.error("请在 .env 中配置 Cloud Mail 参数:") logger.error(" MAIL_ENABLED=true") logger.error(" MAIL_TYPE=cloudmail") logger.error(" MAIL_CLOUDMAIL_API_URL=https://your-domain.com") logger.error(" MAIL_CLOUDMAIL_TOKEN=your_token") return # 初始化 handler try: handler = CloudMailHandler(config.mail.to_dict()) except Exception as e: logger.error(f"CloudMailHandler 初始化失败: {e}") return # 获取测试邮箱 logger.info("") test_email = input("请输入测试邮箱地址: ").strip() if not test_email: logger.error("未输入邮箱地址,退出测试") return # 运行测试 results = {} try: # 测试 1: 邮件查询 results["email_query"] = await test_email_query(handler, test_email) # 测试 2: OTP 等待(可选) if input("\n是否测试 OTP 等待功能? (y/N): ").lower() == 'y': results["otp_waiting"] = await test_otp_waiting(handler, test_email) # 测试 3: 添加用户(可选) if input("\n是否测试添加用户功能? (y/N): ").lower() == 'y': results["add_user"] = await test_add_user(handler) finally: # 清理资源 await handler.close() # 测试总结 logger.info("") logger.info("=" * 60) logger.info("测试结果总结") logger.info("=" * 60) if results: for name, passed in results.items(): status = "✓ 通过" if passed else "✗ 失败" logger.info(f" {name}: {status}") # 总体结果 total = len(results) passed = sum(1 for v in results.values() if v) logger.info("") logger.info(f"总计: {passed}/{total} 测试通过") if passed == total: logger.success("所有测试通过!✅") else: logger.warning(f"部分测试失败 ({total - passed} 个)") else: logger.warning("未执行任何测试") logger.info("=" * 60) if __name__ == "__main__": try: asyncio.run(main()) except KeyboardInterrupt: logger.warning("\n测试被用户中断") except Exception as e: logger.exception(f"测试过程中发生未捕获的异常: {e}")