feat: Implement thread-safe account and statistics management and integrate proxy support for all external requests.

This commit is contained in:
2026-02-13 03:32:27 +08:00
parent 1c58279292
commit ef23318090
12 changed files with 1041 additions and 84 deletions

View File

@@ -4,6 +4,8 @@ import random
import threading
import requests as standard_requests # 用于普通API交互
from config import get_proxy
class MailSystem:
"""单个邮箱系统实例,支持多域名"""
@@ -32,16 +34,23 @@ class MailSystem:
}
]
}
resp = standard_requests.post(url, json=payload, headers=self.headers)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
else:
print(f"[-] 创建邮箱失败: {resp.text}")
try:
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
elif resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
else:
print(f"[-] 创建邮箱失败: {resp.text}")
return None
except Exception as e:
print(f"[-] 创建邮箱请求异常: {e}")
return None
def wait_for_email(self, to_email, retry_count=20, sleep_time=3):
"""像猎人一样耐心等待猎物出现"""
def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None):
"""像猎人一样耐心等待猎物出现,支持外部中断"""
url = f"{self.base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
@@ -54,10 +63,19 @@ class MailSystem:
print(f"[*] 开始轮询邮件,目标: {to_email}...")
for i in range(retry_count):
# 检查外部中断信号
if stop_check and stop_check():
print("[!] 收到停止信号,中断邮件轮询")
return None
try:
resp = standard_requests.post(url, json=payload, headers=self.headers)
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=15)
data = resp.json()
if resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
if data.get('code') == 200 and data.get('data'):
emails = data['data']
for email in emails:
@@ -73,6 +91,27 @@ class MailSystem:
print("[-] 等待超时,未收到邮件。")
return None
def check_health(self) -> dict:
"""检查该邮箱系统的连通性和 Token 有效性"""
if not self.token:
return {"ok": False, "message": "Token 未配置"}
try:
url = f"{self.base_url}/api/public/emailList"
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
resp = standard_requests.post(url, json=payload, headers=self.headers, proxies=get_proxy(), timeout=10)
if resp.status_code == 200:
return {"ok": True, "message": "连接正常"}
elif resp.status_code in (401, 403):
return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"}
else:
return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"}
except standard_requests.exceptions.ConnectTimeout:
return {"ok": False, "message": "连接超时"}
except standard_requests.exceptions.ConnectionError:
return {"ok": False, "message": "无法连接"}
except Exception as e:
return {"ok": False, "message": f"异常: {e}"}
def __repr__(self):
return f"MailSystem({self.base_url}, domains={self.domains})"