import time import re import random import threading import requests as standard_requests # 用于普通API交互 class MailSystem: """单个邮箱系统实例,支持多域名""" def __init__(self, base_url, api_token, domains): self.base_url = base_url self.domains = domains # 该系统支持的域名列表 self.token = api_token self.headers = {"Authorization": self.token} if self.token: print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...") else: print(f"[-] 邮箱系统 Token 为空 ({self.base_url})") def create_user(self, email_prefix, domain=None): """在系统里注册一个新邮箱用户""" if domain is None: domain = random.choice(self.domains) full_email = f"{email_prefix}@{domain}" url = f"{self.base_url}/api/public/addUser" payload = { "list": [ { "email": full_email, "password": "random_pass_ignoring_this" } ] } resp = standard_requests.post(url, json=payload, headers=self.headers) if resp.json().get('code') == 200: print(f"[+] 邮箱用户创建成功: {full_email}") return full_email else: print(f"[-] 创建邮箱失败: {resp.text}") return None def wait_for_email(self, to_email, retry_count=20, sleep_time=3): """像猎人一样耐心等待猎物出现""" url = f"{self.base_url}/api/public/emailList" payload = { "toEmail": to_email, "sendName": "Anthropic", "num": 1, "size": 10, "timeSort": "desc" } print(f"[*] 开始轮询邮件,目标: {to_email}...") for i in range(retry_count): try: resp = standard_requests.post(url, json=payload, headers=self.headers) data = resp.json() if data.get('code') == 200 and data.get('data'): emails = data['data'] for email in emails: print(f"[!] 捕获到邮件! 主题: {email.get('subject')}") return email.get('content') or email.get('text') print(f"[*] 轮询中 ({i+1}/{retry_count})...") time.sleep(sleep_time) except Exception as e: print(f"[-] 轮询出错: {e}") time.sleep(sleep_time) print("[-] 等待超时,未收到邮件。") return None def __repr__(self): return f"MailSystem({self.base_url}, domains={self.domains})" class MailPool: """多邮箱系统轮询调度器""" def __init__(self, mail_configs: list[dict]): """ mail_configs: config.MAIL_SYSTEMS 格式的列表 """ self.systems: list[MailSystem] = [] self._index = 0 self._lock = threading.Lock() for cfg in mail_configs: ms = MailSystem( base_url=cfg["base_url"], api_token=cfg.get("api_token", ""), domains=cfg["domains"], ) if ms.token: # 只添加连接成功的系统 self.systems.append(ms) else: print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}") if not self.systems: print("[-] 没有可用的邮箱系统!") print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)} 个") def next(self) -> MailSystem | None: """Round-robin 返回下一个系统""" if not self.systems: return None with self._lock: ms = self.systems[self._index % len(self.systems)] self._index += 1 return ms def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]: """ 使用下一个系统创建用户。 返回 (email, 对应的 MailSystem) 或 (None, None)。 """ ms = self.next() if not ms: print("[-] 没有可用的邮箱系统") return None, None email = ms.create_user(email_prefix) return email, ms def get_system_by_domain(self, email: str) -> MailSystem | None: """根据邮箱域名找到对应的 MailSystem""" domain = email.split("@")[-1] if "@" in email else "" for ms in self.systems: if domain in ms.domains: return ms return None @property def count(self) -> int: return len(self.systems) def info(self) -> str: """返回所有系统的信息摘要""" lines = [f"📬 邮箱系统池(共 {self.count} 个):"] for i, ms in enumerate(self.systems, 1): domains = ", ".join(ms.domains) lines.append(f" {i}. {ms.base_url} → [{domains}]") return "\n".join(lines) def extract_magic_link(html_content): """HTML提取出链接""" if not html_content: return None pattern = r'href="(https://claude\.ai/[^"]+)"' match = re.search(pattern, html_content) if match: url = match.group(1) return url.replace("&", "&") return None