import time import re import random import threading import requests as standard_requests # 用于普通API交互(不走代理,直连邮件服务器) class MailSystem: """单个邮箱系统实例,支持多域名""" def __init__(self, base_url, api_token, domains): self.base_url = base_url self.domains = domains # 该系统支持的域名列表 self.token = api_token self.headers = {"Authorization": self.token} if self.token: print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...") else: print(f"[-] 邮箱系统 Token 为空 ({self.base_url})") def create_user(self, email_prefix, domain=None): """在系统里注册一个新邮箱用户""" if domain is None: domain = random.choice(self.domains) full_email = f"{email_prefix}@{domain}" url = f"{self.base_url}/api/public/addUser" payload = { "list": [ { "email": full_email, "password": "random_pass_ignoring_this" } ] } try: resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15) if resp.json().get('code') == 200: print(f"[+] 邮箱用户创建成功: {full_email}") return full_email elif resp.status_code in (401, 403): print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}") return None else: print(f"[-] 创建邮箱失败: {resp.text}") return None except Exception as e: print(f"[-] 创建邮箱请求异常: {e}") return None def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None): """像猎人一样耐心等待猎物出现,支持外部中断""" url = f"{self.base_url}/api/public/emailList" payload = { "toEmail": to_email, "sendName": "Anthropic", "num": 1, "size": 10, "timeSort": "desc" } print(f"[*] 开始轮询邮件,目标: {to_email}...") for i in range(retry_count): # 检查外部中断信号 if stop_check and stop_check(): print("[!] 收到停止信号,中断邮件轮询") return None try: resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15) data = resp.json() if resp.status_code in (401, 403): print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}") return None if data.get('code') == 200 and data.get('data'): emails = data['data'] for email in emails: print(f"[!] 捕获到邮件! 主题: {email.get('subject')}") return email.get('content') or email.get('text') print(f"[*] 轮询中 ({i+1}/{retry_count})...") time.sleep(sleep_time) except Exception as e: print(f"[-] 轮询出错: {e}") time.sleep(sleep_time) print("[-] 等待超时,未收到邮件。") return None def check_health(self) -> dict: """检查该邮箱系统的连通性和 Token 有效性""" if not self.token: return {"ok": False, "message": "Token 未配置"} try: url = f"{self.base_url}/api/public/emailList" payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1} resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10) if resp.status_code == 200: return {"ok": True, "message": "连接正常"} elif resp.status_code in (401, 403): return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"} else: return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"} except standard_requests.exceptions.ConnectTimeout: return {"ok": False, "message": "连接超时"} except standard_requests.exceptions.ConnectionError: return {"ok": False, "message": "无法连接"} except Exception as e: return {"ok": False, "message": f"异常: {e}"} def __repr__(self): return f"MailSystem({self.base_url}, domains={self.domains})" class MailPool: """多邮箱系统轮询调度器""" def __init__(self, mail_configs: list[dict]): """ mail_configs: config.MAIL_SYSTEMS 格式的列表 """ self.systems: list[MailSystem] = [] self._index = 0 self._lock = threading.Lock() for cfg in mail_configs: ms = MailSystem( base_url=cfg["base_url"], api_token=cfg.get("api_token", ""), domains=cfg["domains"], ) if ms.token: # 只添加连接成功的系统 self.systems.append(ms) else: print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}") if not self.systems: print("[-] 没有可用的邮箱系统!") print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)} 个") def next(self) -> MailSystem | None: """Round-robin 返回下一个系统""" if not self.systems: return None with self._lock: ms = self.systems[self._index % len(self.systems)] self._index += 1 return ms def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]: """ 使用下一个系统创建用户。 返回 (email, 对应的 MailSystem) 或 (None, None)。 """ ms = self.next() if not ms: print("[-] 没有可用的邮箱系统") return None, None email = ms.create_user(email_prefix) return email, ms def get_system_by_domain(self, email: str) -> MailSystem | None: """根据邮箱域名找到对应的 MailSystem""" domain = email.split("@")[-1] if "@" in email else "" for ms in self.systems: if domain in ms.domains: return ms return None @property def count(self) -> int: return len(self.systems) def info(self) -> str: """返回所有系统的信息摘要""" lines = [f"📬 邮箱系统池(共 {self.count} 个):"] for i, ms in enumerate(self.systems, 1): domains = ", ".join(ms.domains) lines.append(f" {i}. {ms.base_url} → [{domains}]") return "\n".join(lines) def extract_magic_link(html_content): """HTML提取出链接""" if not html_content: return None pattern = r'href="(https://claude\.ai/[^"]+)"' match = re.search(pattern, html_content) if match: url = match.group(1) return url.replace("&", "&") return None