feat(config, email_service, telegram_bot): Add dynamic GPTMail API key management and config reload capability

- Add reload_config() function to dynamically reload config.toml and team.json without restart
- Implement GPTMail API key management with support for multiple keys (api_keys list)
- Add functions to manage GPTMail keys: get_gptmail_keys(), add_gptmail_key(), remove_gptmail_key()
- Add key rotation strategies: get_next_gptmail_key() (round-robin) and get_random_gptmail_key()
- Add gptmail_keys.json file support for dynamic key management
- Fix config parsing to handle include_team_owners and proxy settings in multiple locations
- Update email_service.py to use key rotation for GPTMail API calls
- Update telegram_bot.py to support dynamic key management
- Add install_service.sh script for service installation
- Update config.toml.example with new api_keys configuration option
- Improve configuration flexibility by supporting both old (api_key) and new (api_keys) formats
This commit is contained in:
2026-01-17 05:52:05 +08:00
parent 64707768f8
commit b902922d22
5 changed files with 1067 additions and 84 deletions

View File

@@ -9,6 +9,7 @@ import requests
from typing import Callable, TypeVar, Optional, Any
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from concurrent.futures import ThreadPoolExecutor, as_completed
from config import (
EMAIL_API_BASE,
@@ -21,10 +22,11 @@ from config import (
get_random_domain,
EMAIL_PROVIDER,
GPTMAIL_API_BASE,
GPTMAIL_API_KEY,
GPTMAIL_PREFIX,
GPTMAIL_DOMAINS,
get_random_gptmail_domain,
get_next_gptmail_key,
get_gptmail_keys,
)
from logger import log
@@ -136,27 +138,34 @@ def poll_with_retry(
# ==================== GPTMail 临时邮箱服务 ====================
class GPTMailService:
"""GPTMail 临时邮箱服务"""
"""GPTMail 临时邮箱服务 (支持多 Key 轮询)"""
def __init__(self, api_base: str = None, api_key: str = None):
self.api_base = api_base or GPTMAIL_API_BASE
self.api_key = api_key or GPTMAIL_API_KEY
self.headers = {
"X-API-Key": self.api_key,
# 如果指定了 api_key 则使用指定的,否则使用轮询
self._fixed_key = api_key
def _get_headers(self, api_key: str = None) -> dict:
"""获取请求头 (支持指定 Key 或轮询)"""
key = api_key or self._fixed_key or get_next_gptmail_key()
return {
"X-API-Key": key,
"Content-Type": "application/json"
}
def generate_email(self, prefix: str = None, domain: str = None) -> tuple[str, str]:
def generate_email(self, prefix: str = None, domain: str = None, api_key: str = None) -> tuple[str, str]:
"""生成临时邮箱地址
Args:
prefix: 邮箱前缀 (可选)
domain: 域名 (可选)
api_key: 指定使用的 API Key (可选,不指定则轮询)
Returns:
tuple: (email, error) - 邮箱地址和错误信息
"""
url = f"{self.api_base}/api/generate-email"
headers = self._get_headers(api_key)
try:
if prefix or domain:
@@ -165,9 +174,9 @@ class GPTMailService:
payload["prefix"] = prefix
if domain:
payload["domain"] = domain
response = http_session.post(url, headers=self.headers, json=payload, timeout=REQUEST_TIMEOUT)
response = http_session.post(url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
else:
response = http_session.get(url, headers=self.headers, timeout=REQUEST_TIMEOUT)
response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
@@ -195,9 +204,10 @@ class GPTMailService:
"""
url = f"{self.api_base}/api/emails"
params = {"email": email}
headers = self._get_headers()
try:
response = http_session.get(url, headers=self.headers, params=params, timeout=REQUEST_TIMEOUT)
response = http_session.get(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
@@ -221,9 +231,10 @@ class GPTMailService:
tuple: (email_detail, error) - 邮件详情和错误信息
"""
url = f"{self.api_base}/api/email/{email_id}"
headers = self._get_headers()
try:
response = http_session.get(url, headers=self.headers, timeout=REQUEST_TIMEOUT)
response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
@@ -246,9 +257,10 @@ class GPTMailService:
tuple: (success, error)
"""
url = f"{self.api_base}/api/email/{email_id}"
headers = self._get_headers()
try:
response = http_session.delete(url, headers=self.headers, timeout=REQUEST_TIMEOUT)
response = http_session.delete(url, headers=headers, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
@@ -270,9 +282,10 @@ class GPTMailService:
"""
url = f"{self.api_base}/api/emails/clear"
params = {"email": email}
headers = self._get_headers()
try:
response = http_session.delete(url, headers=self.headers, params=params, timeout=REQUEST_TIMEOUT)
response = http_session.delete(url, headers=headers, params=params, timeout=REQUEST_TIMEOUT)
data = response.json()
if data.get("success"):
@@ -284,6 +297,35 @@ class GPTMailService:
except Exception as e:
return 0, str(e)
def test_api_key(self, api_key: str) -> tuple[bool, str]:
"""测试 API Key 是否有效
Args:
api_key: 要测试的 API Key
Returns:
tuple: (success, message)
"""
url = f"{self.api_base}/api/generate-email"
headers = {
"X-API-Key": api_key,
"Content-Type": "application/json"
}
try:
response = http_session.get(url, headers=headers, timeout=10)
data = response.json()
if data.get("success"):
email = data.get("data", {}).get("email", "")
return True, f"Key 有效,测试邮箱: {email}"
else:
error = data.get("error", "Unknown error")
return False, f"Key 无效: {error}"
except Exception as e:
return False, f"测试失败: {e}"
def get_verification_code(self, email: str, max_retries: int = None, interval: int = None) -> tuple[str, str, str]:
"""从邮箱获取验证码 (使用通用轮询重试)
@@ -532,29 +574,42 @@ def fetch_email_content(email: str) -> list:
return []
def batch_create_emails(count: int = 4) -> list:
"""批量创建邮箱 (根据 EMAIL_PROVIDER 配置自动选择邮箱系统)
def batch_create_emails(count: int = 4, max_workers: int = 4) -> list:
"""批量创建邮箱 (并行版本,根据 EMAIL_PROVIDER 配置自动选择邮箱系统)
Args:
count: 创建数量
max_workers: 最大并行线程数,默认 4
Returns:
list: [{"email": "...", "password": "..."}, ...]
"""
accounts = []
failed_count = 0
for i in range(count):
email, password = unified_create_email()
# 使用线程池并行创建邮箱
with ThreadPoolExecutor(max_workers=min(count, max_workers)) as executor:
# 提交所有创建任务
futures = {executor.submit(unified_create_email): i for i in range(count)}
if email:
accounts.append({
"email": email,
"password": password
})
else:
log.warning(f"跳过第 {i+1} 个邮箱创建")
# 收集结果
for future in as_completed(futures):
task_idx = futures[future]
try:
email, password = future.result()
if email:
accounts.append({
"email": email,
"password": password
})
else:
failed_count += 1
log.warning(f"邮箱创建失败 (任务 {task_idx + 1})")
except Exception as e:
failed_count += 1
log.warning(f"邮箱创建异常 (任务 {task_idx + 1}): {e}")
log.info(f"邮箱创建完成: {len(accounts)}/{count}", icon="email")
log.info(f"邮箱创建完成: {len(accounts)}/{count}" + (f" (失败 {failed_count})" if failed_count else ""), icon="email")
return accounts