feat(telegram_bot): Add Cloud Mail configuration and batch team processing

- Add Cloud Mail API configuration support (api_base, api_auth, domains) in config.py
- Implement run_teams_by_count() function for processing specified number of teams with smart filtering
- Add Cloud Mail management commands: /cloudmail, /cloudmail_token, /cloudmail_api, /cloudmail_domains
- Add callback handlers for run_all, run, and cloudmail interactive operations
- Refactor /run command to use interactive selection for count and email service instead of direct argument
- Update bot command descriptions and help text to reflect new functionality
- Add Cloud Mail domain management and token configuration capabilities
- Enable batch processing with progress tracking and automatic team completion detection
This commit is contained in:
2026-01-30 14:07:37 +08:00
parent 79c3eb733c
commit f39dff8ee6
3 changed files with 936 additions and 54 deletions

View File

@@ -307,6 +307,7 @@ def reload_config() -> dict:
"""
global _cfg, _raw_teams, TEAMS
global EMAIL_PROVIDER, INCLUDE_TEAM_OWNERS, AUTH_PROVIDER
global EMAIL_API_BASE, EMAIL_API_AUTH, EMAIL_DOMAINS, EMAIL_DOMAIN
global BROWSER_HEADLESS, ACCOUNTS_PER_TEAM
global GPTMAIL_API_KEYS, GPTMAIL_DOMAINS, GPTMAIL_PREFIX
global PROXY_ENABLED, PROXIES
@@ -361,6 +362,13 @@ def reload_config() -> dict:
GPTMAIL_DOMAINS = _gptmail.get("domains", [])
GPTMAIL_API_KEYS = _gptmail.get("api_keys", []) or ["gpt-test"]
# Cloud Mail (email) 配置
_email = _cfg.get("email", {})
EMAIL_API_BASE = _email.get("api_base", "")
EMAIL_API_AUTH = _email.get("api_auth", "")
EMAIL_DOMAINS = _email.get("domains", []) or ([_email["domain"]] if _email.get("domain") else [])
EMAIL_DOMAIN = EMAIL_DOMAINS[0] if EMAIL_DOMAINS else ""
# 代理配置
_proxy_enabled_top = _cfg.get("proxy_enabled")
_proxy_enabled_browser = _cfg.get("browser", {}).get("proxy_enabled")

79
run.py
View File

@@ -1298,6 +1298,85 @@ def run_single_team(team_index: int = 0):
return _current_results
def run_teams_by_count(count: int):
"""运行指定数量的 Team
Args:
count: 要处理的 Team 数量
"""
global _tracker, _current_results, _shutdown_requested
log.header("ChatGPT Team 批量注册自动化")
# 打印系统配置
_print_system_config()
# 限制数量不超过总数
actual_count = min(count, len(TEAMS))
log.info(f"选择处理前 {actual_count} 个 Team (共 {len(TEAMS)} 个)", icon="team")
log.info(f"统一密码: {DEFAULT_PASSWORD}", icon="code")
log.info("按 Ctrl+C 可安全退出并保存进度")
log.separator()
# 先显示整体状态
_tracker = load_team_tracker()
_current_results = []
# 筛选需要处理的 Team (只取前 count 个中需要处理的)
teams_to_process = []
for i, team in enumerate(TEAMS[:actual_count]):
team_name = team["name"]
team_accounts = _tracker.get("teams", {}).get(team_name, [])
member_accounts = [acc for acc in team_accounts if acc.get("role") != "owner"]
owner_accounts = [acc for acc in team_accounts if acc.get("role") == "owner" and acc.get("status") != "completed"]
completed_count = sum(1 for acc in member_accounts if acc.get("status") == "completed")
member_count = len(member_accounts)
needs_processing = (
member_count < ACCOUNTS_PER_TEAM or
completed_count < member_count or
len(owner_accounts) > 0
)
if needs_processing:
teams_to_process.append((i, team))
if not teams_to_process:
log.success("选定的 Team 已全部完成处理,无需继续")
return _current_results
skipped_count = actual_count - len(teams_to_process)
if skipped_count > 0:
log.info(f"跳过 {skipped_count} 个已完成的 Team处理剩余 {len(teams_to_process)}")
teams_total = len(teams_to_process)
with Timer("全部流程"):
for idx, (original_idx, team) in enumerate(teams_to_process):
if _shutdown_requested:
log.warning("检测到中断请求,停止处理...")
break
log.separator("", 60)
team_email = team.get('account') or team.get('owner_email', '')
log.highlight(f"Team {idx + 1}/{teams_total}: {team['name']} ({team_email})", icon="team")
log.separator("", 60)
results, _ = process_single_team(team, team_index=idx + 1, teams_total=teams_total)
_current_results.extend(results)
if idx < teams_total - 1 and not _shutdown_requested:
wait_time = 3
log.countdown(wait_time, "下一个 Team")
print_summary(_current_results)
return _current_results
def test_email_only():
"""测试模式: 只创建邮箱和邀请,不注册"""
global _tracker

View File

@@ -142,6 +142,10 @@ class ProvisionerBot:
("gptmail_keys", self.cmd_gptmail_keys),
("gptmail_add", self.cmd_gptmail_add),
("gptmail_del", self.cmd_gptmail_del),
("cloudmail", self.cmd_cloudmail),
("cloudmail_token", self.cmd_cloudmail_token),
("cloudmail_api", self.cmd_cloudmail_api),
("cloudmail_domains", self.cmd_cloudmail_domains),
("iban_list", self.cmd_iban_list),
("iban_add", self.cmd_iban_add),
("iban_clear", self.cmd_iban_clear),
@@ -192,6 +196,18 @@ class ProvisionerBot:
self.callback_autogptplus,
pattern="^autogptplus:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_run_all,
pattern="^run_all:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_run,
pattern="^run:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_cloudmail,
pattern="^cloudmail:"
))
# 注册自定义数量输入处理器 (GPT Team 注册)
self.app.add_handler(MessageHandler(
@@ -253,7 +269,7 @@ class ProvisionerBot:
BotCommand("logs_live", "启用实时日志推送"),
BotCommand("logs_stop", "停止实时日志推送"),
# 任务控制
BotCommand("run", "处理指定 Team"),
BotCommand("run", "选择数量和邮箱开始处理"),
BotCommand("run_all", "处理所有 Team"),
BotCommand("resume", "继续处理未完成账号"),
BotCommand("stop", "停止当前任务"),
@@ -279,6 +295,11 @@ class ProvisionerBot:
BotCommand("gptmail_add", "添加 GPTMail API Key"),
BotCommand("gptmail_del", "删除 GPTMail API Key"),
BotCommand("test_email", "测试邮箱创建"),
# Cloud Mail
BotCommand("cloudmail", "Cloud Mail 管理面板"),
BotCommand("cloudmail_token", "设置 Cloud Mail Token"),
BotCommand("cloudmail_api", "设置 Cloud Mail API 地址"),
BotCommand("cloudmail_domains", "Cloud Mail 域名管理"),
# IBAN 管理
BotCommand("iban_list", "查看 IBAN 列表"),
BotCommand("iban_add", "添加 IBAN"),
@@ -316,7 +337,7 @@ class ProvisionerBot:
/logs_stop - 停止实时日志推送
<b>🚀 任务控制:</b>
/run &lt;n&gt; - 开始处理第 n 个 Team
/run - 选择数量和邮箱服务开始处理
/run_all - 开始处理所有 Team
/resume - 继续处理未完成账号
/stop - 停止当前任务
@@ -352,6 +373,12 @@ class ProvisionerBot:
/gptmail_del &lt;key&gt; - 删除 API Key
/test_email - 测试邮箱创建
<b>☁️ Cloud Mail 管理:</b>
/cloudmail - Cloud Mail 管理面板
/cloudmail_token &lt;token&gt; - 设置 API Token
/cloudmail_api &lt;url&gt; - 设置 API 地址
/cloudmail_domains - 域名管理 (查看/添加/删除)
<b>💳 IBAN 管理 (GPT Team):</b>
/iban_list - 查看 IBAN 列表
/iban_add &lt;ibans&gt; - 添加 IBAN (每行一个或逗号分隔)
@@ -1047,78 +1074,71 @@ class ProvisionerBot:
@admin_only
async def cmd_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理指定 Team"""
"""启动处理指定数量的 Team - 交互式选择"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
if not context.args:
await update.message.reply_text("用法: /run <序号>\n示例: /run 0")
if not TEAMS:
await update.message.reply_text("📭 team.json 中没有账号")
return
try:
team_idx = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 无效的序号,必须是数字")
return
total_teams = len(TEAMS)
if team_idx < 0 or team_idx >= len(TEAMS):
await update.message.reply_text(f"❌ 序号超出范围,有效范围: 0-{len(TEAMS)-1}")
return
# 构建选择按钮
keyboard = [
[
InlineKeyboardButton(f"📦 全部 ({total_teams})", callback_data=f"run:count:{total_teams}"),
],
[
InlineKeyboardButton("✏️ 自定义数量", callback_data="run:custom"),
],
[
InlineKeyboardButton("❌ 取消", callback_data="run:cancel"),
],
]
team_name = TEAMS[team_idx].get("name", f"Team{team_idx}")
self.current_team = team_name
reply_markup = InlineKeyboardMarkup(keyboard)
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
await update.message.reply_text(f"🚀 开始处理 Team {team_idx}: {team_name}...")
# 在后台线程执行任务
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_team_task,
team_idx
await update.message.reply_text(
f"<b>🚀 启动处理 Team</b>\n\n"
f"{total_teams} 个 Team 可处理\n\n"
f"请选择要处理的数量:",
parse_mode="HTML",
reply_markup=reply_markup
)
# 添加完成回调
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, team_name))
@admin_only
async def cmd_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""启动处理所有 Team"""
"""启动处理所有 Team - 先选择邮箱服务"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
self.current_team = "全部"
# 显示邮箱服务选择界面
keyboard = [
[
InlineKeyboardButton("📧 GPTMail", callback_data="run_all:select_email:gptmail"),
InlineKeyboardButton("☁️ Cloud Mail", callback_data="run_all:select_email:cloudmail"),
],
[
InlineKeyboardButton("❌ 取消", callback_data="run_all:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
# 重置停止标志,确保新任务可以正常运行
try:
import run
run._shutdown_requested = False
except Exception:
pass
await update.message.reply_text(f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...")
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
await update.message.reply_text(
f"<b>🚀 启动处理所有 Team</b>\n\n"
f"{len(TEAMS)} 个 Team 待处理\n\n"
f"请选择邮箱服务:",
parse_mode="HTML",
reply_markup=reply_markup
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
@admin_only
async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""继续处理未完成的账号"""
@@ -1224,6 +1244,21 @@ class ProvisionerBot:
return run_all_teams()
def _run_teams_by_count_task(self, count: int):
"""执行指定数量的 Team 任务 (在线程池中运行)"""
from run import run_teams_by_count
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
# 预加载 account_id
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_teams_by_count(count)
@admin_only
async def cmd_stop(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""强制停止当前任务"""
@@ -2845,6 +2880,271 @@ class ProvisionerBot:
else:
await update.message.reply_text("❌ Key 不存在或删除失败")
# ==================== Cloud Mail 管理命令 ====================
@admin_only
async def cmd_cloudmail(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""Cloud Mail 管理面板"""
from config import EMAIL_API_BASE, EMAIL_API_AUTH, EMAIL_DOMAINS, EMAIL_PROVIDER
# 获取当前配置
api_base = EMAIL_API_BASE or "未配置"
auth_display = "未配置"
if EMAIL_API_AUTH:
if len(EMAIL_API_AUTH) > 10:
auth_display = f"{EMAIL_API_AUTH[:8]}...{EMAIL_API_AUTH[-4:]}"
else:
auth_display = EMAIL_API_AUTH[:4] + "..."
domains_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
domains_display = ", ".join(EMAIL_DOMAINS[:3]) if EMAIL_DOMAINS else "未配置"
if domains_count > 3:
domains_display += f" (+{domains_count - 3})"
is_active = EMAIL_PROVIDER == "cloudmail"
keyboard = [
[
InlineKeyboardButton("🔑 设置 Token", callback_data="cloudmail:set_token"),
InlineKeyboardButton("🌐 设置 API 地址", callback_data="cloudmail:set_api"),
],
[
InlineKeyboardButton("📧 域名管理", callback_data="cloudmail:domains"),
InlineKeyboardButton("🔄 测试连接", callback_data="cloudmail:test"),
],
[
InlineKeyboardButton(
f"{'' if is_active else ''} 设为当前邮箱服务",
callback_data="cloudmail:activate"
),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"<b>☁️ Cloud Mail 管理</b>\n\n"
f"<b>当前配置:</b>\n"
f" API 地址: <code>{api_base}</code>\n"
f" Token: <code>{auth_display}</code>\n"
f" 域名数量: {domains_count}\n"
f" 域名: {domains_display}\n\n"
f"<b>状态:</b> {'✅ 当前使用中' if is_active else '⬜ 未激活'}\n\n"
f"选择操作:",
parse_mode="HTML",
reply_markup=reply_markup
)
@admin_only
async def cmd_cloudmail_token(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""设置 Cloud Mail API Token"""
if not context.args:
await update.message.reply_text(
"<b>🔑 设置 Cloud Mail Token</b>\n\n"
"用法: <code>/cloudmail_token &lt;token&gt;</code>\n\n"
"此命令会更新 config.toml 中的:\n"
"• [email] api_auth",
parse_mode="HTML"
)
return
new_token = context.args[0].strip()
try:
import tomli_w
import tomllib
from config import CONFIG_FILE
# 读取配置
with open(CONFIG_FILE, "rb") as f:
config = tomllib.load(f)
# 确保 [email] section 存在
if "email" not in config:
config["email"] = {}
# 更新 api_auth
config["email"]["api_auth"] = new_token
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
from config import reload_config
reload_config()
token_display = f"{new_token[:8]}...{new_token[-4:]}" if len(new_token) > 12 else new_token
await update.message.reply_text(
f"<b>✅ Cloud Mail Token 已更新</b>\n\n"
f"新 Token: <code>{token_display}</code>",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ 缺少 tomli_w 依赖\n请运行: uv add tomli_w")
except Exception as e:
await update.message.reply_text(f"❌ 更新失败: {e}")
@admin_only
async def cmd_cloudmail_api(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""设置 Cloud Mail API 地址"""
if not context.args:
await update.message.reply_text(
"<b>🌐 设置 Cloud Mail API 地址</b>\n\n"
"用法: <code>/cloudmail_api &lt;url&gt;</code>\n\n"
"示例: <code>/cloudmail_api https://mail.example.com/api/public</code>",
parse_mode="HTML"
)
return
new_api = context.args[0].strip()
try:
import tomli_w
import tomllib
from config import CONFIG_FILE
# 读取配置
with open(CONFIG_FILE, "rb") as f:
config = tomllib.load(f)
# 确保 [email] section 存在
if "email" not in config:
config["email"] = {}
# 更新 api_base
config["email"]["api_base"] = new_api
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
from config import reload_config
reload_config()
await update.message.reply_text(
f"<b>✅ Cloud Mail API 地址已更新</b>\n\n"
f"新地址: <code>{new_api}</code>",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ 缺少 tomli_w 依赖\n请运行: uv add tomli_w")
except Exception as e:
await update.message.reply_text(f"❌ 更新失败: {e}")
@admin_only
async def cmd_cloudmail_domains(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""管理 Cloud Mail 域名"""
from config import EMAIL_DOMAINS
if not context.args:
# 显示当前域名列表
if not EMAIL_DOMAINS:
await update.message.reply_text(
"<b>📧 Cloud Mail 域名</b>\n\n"
"📭 暂无配置域名\n\n"
"使用 <code>/cloudmail_domains add domain.com</code> 添加",
parse_mode="HTML"
)
return
lines = [f"<b>📧 Cloud Mail 域名 (共 {len(EMAIL_DOMAINS)} 个)</b>\n"]
for i, domain in enumerate(EMAIL_DOMAINS):
lines.append(f"{i+1}. <code>{domain}</code>")
lines.append(f"\n<b>💡 管理:</b>")
lines.append(f"/cloudmail_domains add &lt;domain&gt; - 添加域名")
lines.append(f"/cloudmail_domains del &lt;domain&gt; - 删除域名")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
return
action = context.args[0].lower()
if action == "add" and len(context.args) > 1:
domain = context.args[1].strip()
await self._cloudmail_add_domain(update, domain)
elif action == "del" and len(context.args) > 1:
domain = context.args[1].strip()
await self._cloudmail_del_domain(update, domain)
else:
await update.message.reply_text(
"<b>📧 Cloud Mail 域名管理</b>\n\n"
"用法:\n"
"• <code>/cloudmail_domains</code> - 查看域名列表\n"
"• <code>/cloudmail_domains add domain.com</code> - 添加域名\n"
"• <code>/cloudmail_domains del domain.com</code> - 删除域名",
parse_mode="HTML"
)
async def _cloudmail_add_domain(self, update: Update, domain: str):
"""添加 Cloud Mail 域名"""
try:
import tomli_w
from config import CONFIG_FILE, EMAIL_DOMAINS
import tomllib
if domain in EMAIL_DOMAINS:
await update.message.reply_text(f"⚠️ 域名 {domain} 已存在")
return
# 读取配置
with open(CONFIG_FILE, "rb") as f:
config = tomllib.load(f)
# 确保 [email] section 存在
if "email" not in config:
config["email"] = {}
# 更新 domains 列表
current_domains = config["email"].get("domains", [])
if domain not in current_domains:
current_domains.append(domain)
config["email"]["domains"] = current_domains
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
from config import reload_config
reload_config()
await update.message.reply_text(f"✅ 已添加域名: {domain}")
except ImportError:
await update.message.reply_text("❌ 缺少 tomli_w 依赖\n请运行: uv add tomli_w")
except Exception as e:
await update.message.reply_text(f"❌ 添加失败: {e}")
async def _cloudmail_del_domain(self, update: Update, domain: str):
"""删除 Cloud Mail 域名"""
try:
import tomli_w
from config import CONFIG_FILE, EMAIL_DOMAINS
import tomllib
if domain not in EMAIL_DOMAINS:
await update.message.reply_text(f"⚠️ 域名 {domain} 不存在")
return
# 读取配置
with open(CONFIG_FILE, "rb") as f:
config = tomllib.load(f)
# 更新 domains 列表
if "email" in config and "domains" in config["email"]:
config["email"]["domains"] = [d for d in config["email"]["domains"] if d != domain]
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
from config import reload_config
reload_config()
await update.message.reply_text(f"✅ 已删除域名: {domain}")
except ImportError:
await update.message.reply_text("❌ 缺少 tomli_w 依赖\n请运行: uv add tomli_w")
except Exception as e:
await update.message.reply_text(f"❌ 删除失败: {e}")
@admin_only
async def cmd_test_email(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""测试邮箱创建功能"""
@@ -3928,6 +4228,453 @@ class ProvisionerBot:
return InlineKeyboardMarkup(keyboard)
async def callback_run(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 run 命令的回调 - 选择数量和邮箱服务"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
value = data[2] if len(data) > 2 else ""
if action == "cancel":
await query.edit_message_text("❌ 已取消")
return
if action == "custom":
# 提示用户输入自定义数量
context.user_data["waiting_run_count"] = True
context.user_data["run_message_id"] = query.message.message_id
await query.edit_message_text(
"<b>✏️ 自定义数量</b>\n\n"
f"请输入要处理的 Team 数量 (1-{len(TEAMS)}):",
parse_mode="HTML"
)
return
if action == "count":
# 选择了数量,显示邮箱服务选择
count = int(value)
context.user_data["run_team_count"] = count
keyboard = [
[
InlineKeyboardButton("📧 GPTMail", callback_data=f"run:email:gptmail"),
InlineKeyboardButton("☁️ Cloud Mail", callback_data=f"run:email:cloudmail"),
],
[
InlineKeyboardButton("⬅️ 返回", callback_data="run:back"),
InlineKeyboardButton("❌ 取消", callback_data="run:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>🚀 启动处理 Team</b>\n\n"
f"已选择: <b>{count}</b> 个 Team\n\n"
f"请选择邮箱服务:",
parse_mode="HTML",
reply_markup=reply_markup
)
return
if action == "back":
# 返回数量选择
total_teams = len(TEAMS)
keyboard = [
[
InlineKeyboardButton(f"📦 全部 ({total_teams})", callback_data=f"run:count:{total_teams}"),
],
[
InlineKeyboardButton("✏️ 自定义数量", callback_data="run:custom"),
],
[
InlineKeyboardButton("❌ 取消", callback_data="run:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>🚀 启动处理 Team</b>\n\n"
f"{total_teams} 个 Team 可处理\n\n"
f"请选择要处理的数量:",
parse_mode="HTML",
reply_markup=reply_markup
)
return
if action == "email":
# 选择了邮箱服务,开始测试连接并执行任务
email_provider = value
count = context.user_data.get("run_team_count", 1)
await query.edit_message_text(
f"<b>⏳ 正在测试邮箱服务连接...</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}",
parse_mode="HTML"
)
# 测试邮箱连接
success, message = await self._test_email_provider_connection(email_provider)
if not success:
keyboard = [
[
InlineKeyboardButton("🔄 重试", callback_data=f"run:email:{email_provider}"),
InlineKeyboardButton("❌ 取消", callback_data="run:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>❌ 邮箱服务连接失败</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}\n"
f"错误: {message}\n\n"
f"请检查配置后重试",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 连接成功,更新配置并开始任务
await self._update_email_provider(email_provider)
await query.edit_message_text(
f"<b>✅ 邮箱服务连接成功</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}\n"
f"响应: {message}\n\n"
f"🚀 开始处理 {count} 个 Team...",
parse_mode="HTML"
)
# 启动任务
if count == 1:
self.current_team = TEAMS[0].get("name", "Team0")
else:
self.current_team = f"{count}"
# 重置停止标志
try:
import run
run._shutdown_requested = False
except Exception:
pass
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_teams_by_count_task,
count
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, self.current_team))
async def callback_run_all(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 run_all 邮箱选择回调"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
value = data[2] if len(data) > 2 else ""
if action == "cancel":
await query.edit_message_text("❌ 已取消")
return
if action == "select_email":
# 选择了邮箱服务,开始测试连接
email_provider = value # gptmail 或 cloudmail
await query.edit_message_text(
f"<b>⏳ 正在测试邮箱服务连接...</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}",
parse_mode="HTML"
)
# 测试邮箱连接
success, message = await self._test_email_provider_connection(email_provider)
if not success:
keyboard = [
[
InlineKeyboardButton("🔄 重试", callback_data=f"run_all:select_email:{email_provider}"),
InlineKeyboardButton("❌ 取消", callback_data="run_all:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>❌ 邮箱服务连接失败</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}\n"
f"错误: {message}\n\n"
f"请检查配置后重试",
parse_mode="HTML",
reply_markup=reply_markup
)
return
# 连接成功,更新配置并开始任务
await self._update_email_provider(email_provider)
await query.edit_message_text(
f"<b>✅ 邮箱服务连接成功</b>\n\n"
f"邮箱服务: {'GPTMail' if email_provider == 'gptmail' else 'Cloud Mail'}\n"
f"响应: {message}\n\n"
f"🚀 开始处理所有 Team (共 {len(TEAMS)} 个)...",
parse_mode="HTML"
)
# 启动任务
self.current_team = "全部"
# 重置停止标志
try:
import run
run._shutdown_requested = False
except Exception:
pass
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
async def _test_email_provider_connection(self, provider: str) -> tuple:
"""测试邮箱服务连接
Args:
provider: 邮箱服务类型 (gptmail / cloudmail)
Returns:
tuple: (success, message)
"""
import requests
try:
if provider == "gptmail":
# 测试 GPTMail
from config import GPTMAIL_API_BASE, get_gptmail_keys
keys = get_gptmail_keys()
if not keys:
return False, "没有配置 GPTMail API Key"
api_key = keys[0]
url = f"{GPTMAIL_API_BASE}/api/mail/list"
headers = {
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
payload = {"email": "test@test.com", "limit": 1}
response = requests.post(url, headers=headers, json=payload, timeout=10)
if response.status_code == 200:
return True, "API 连接正常"
else:
return False, f"HTTP {response.status_code}"
elif provider == "cloudmail":
# 测试 Cloud Mail
from config import EMAIL_API_BASE, EMAIL_API_AUTH, EMAIL_DOMAINS
if not EMAIL_API_BASE:
return False, "未配置 email.api_base"
if not EMAIL_API_AUTH:
return False, "未配置 email.api_auth"
if not EMAIL_DOMAINS:
return False, "未配置 email.domains"
url = f"{EMAIL_API_BASE}/emailList"
headers = {
"Authorization": EMAIL_API_AUTH,
"Content-Type": "application/json"
}
payload = {
"toEmail": f"test@{EMAIL_DOMAINS[0]}",
"timeSort": "desc",
"size": 1
}
response = requests.post(url, headers=headers, json=payload, timeout=10)
data = response.json()
if data.get("code") == 200:
return True, "API 连接正常"
else:
return False, data.get("message", "未知错误")
return False, "未知的邮箱服务类型"
except requests.exceptions.Timeout:
return False, "连接超时"
except requests.exceptions.ConnectionError:
return False, "无法连接服务器"
except Exception as e:
return False, str(e)
async def _update_email_provider(self, provider: str):
"""更新邮箱服务配置
Args:
provider: 邮箱服务类型 (gptmail / cloudmail)
"""
try:
import tomli_w
from config import CONFIG_FILE
# 读取当前配置
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
content = f.read()
# 更新 email_provider
import re
if re.search(r'^email_provider\s*=', content, re.MULTILINE):
content = re.sub(
r'^(email_provider\s*=\s*).*$',
f'\\g<1>"{provider}"',
content,
flags=re.MULTILINE
)
else:
# 在文件开头添加
content = f'email_provider = "{provider}"\n' + content
# 写回配置文件
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
f.write(content)
# 重载配置
from config import reload_config
reload_config()
except Exception as e:
log.error(f"更新邮箱配置失败: {e}")
async def callback_cloudmail(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 Cloud Mail 回调"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
if action == "set_token":
await query.edit_message_text(
"<b>🔑 设置 Cloud Mail Token</b>\n\n"
"请使用命令设置:\n"
"<code>/cloudmail_token &lt;your_token&gt;</code>",
parse_mode="HTML"
)
elif action == "set_api":
await query.edit_message_text(
"<b>🌐 设置 Cloud Mail API 地址</b>\n\n"
"请使用命令设置:\n"
"<code>/cloudmail_api &lt;url&gt;</code>\n\n"
"示例:\n"
"<code>/cloudmail_api https://mail.example.com/api/public</code>",
parse_mode="HTML"
)
elif action == "domains":
await query.edit_message_text(
"<b>📧 Cloud Mail 域名管理</b>\n\n"
"请使用命令管理:\n"
"• <code>/cloudmail_domains</code> - 查看域名列表\n"
"• <code>/cloudmail_domains add domain.com</code> - 添加域名\n"
"• <code>/cloudmail_domains del domain.com</code> - 删除域名",
parse_mode="HTML"
)
elif action == "test":
await query.edit_message_text("⏳ 正在测试 Cloud Mail 连接...")
success, message = await self._test_email_provider_connection("cloudmail")
if success:
await query.edit_message_text(
f"<b>✅ Cloud Mail 连接成功</b>\n\n"
f"状态: {message}",
parse_mode="HTML"
)
else:
await query.edit_message_text(
f"<b>❌ Cloud Mail 连接失败</b>\n\n"
f"错误: {message}",
parse_mode="HTML"
)
elif action == "activate":
await self._update_email_provider("cloudmail")
await query.answer("✅ 已切换到 Cloud Mail", show_alert=True)
# 刷新面板
from config import EMAIL_API_BASE, EMAIL_API_AUTH, EMAIL_DOMAINS
api_base = EMAIL_API_BASE or "未配置"
auth_display = "未配置"
if EMAIL_API_AUTH:
if len(EMAIL_API_AUTH) > 10:
auth_display = f"{EMAIL_API_AUTH[:8]}...{EMAIL_API_AUTH[-4:]}"
else:
auth_display = EMAIL_API_AUTH[:4] + "..."
domains_count = len(EMAIL_DOMAINS) if EMAIL_DOMAINS else 0
domains_display = ", ".join(EMAIL_DOMAINS[:3]) if EMAIL_DOMAINS else "未配置"
if domains_count > 3:
domains_display += f" (+{domains_count - 3})"
keyboard = [
[
InlineKeyboardButton("🔑 设置 Token", callback_data="cloudmail:set_token"),
InlineKeyboardButton("🌐 设置 API 地址", callback_data="cloudmail:set_api"),
],
[
InlineKeyboardButton("📧 域名管理", callback_data="cloudmail:domains"),
InlineKeyboardButton("🔄 测试连接", callback_data="cloudmail:test"),
],
[
InlineKeyboardButton("✅ 当前邮箱服务", callback_data="cloudmail:activate"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>☁️ Cloud Mail 管理</b>\n\n"
f"<b>当前配置:</b>\n"
f" API 地址: <code>{api_base}</code>\n"
f" Token: <code>{auth_display}</code>\n"
f" 域名数量: {domains_count}\n"
f" 域名: {domains_display}\n\n"
f"<b>状态:</b> ✅ 当前使用中\n\n"
f"选择操作:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def callback_autogptplus(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 AutoGPTPlus 回调"""
query = update.callback_query
@@ -5028,7 +5775,7 @@ class ProvisionerBot:
@admin_only
async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token / 域名 / IBAN)"""
"""处理文本输入 (GPT Team 自定义数量 / AutoGPTPlus Token / 域名 / IBAN / run 自定义数量)"""
# 处理 AutoGPTPlus Token 输入
if context.user_data.get("autogptplus_waiting_token"):
@@ -5048,6 +5795,12 @@ class ProvisionerBot:
await self._handle_autogptplus_iban_input(update, context)
return
# 处理 /run 自定义数量输入
if context.user_data.get("waiting_run_count"):
context.user_data["waiting_run_count"] = False
await self._handle_run_custom_count_input(update, context)
return
# 处理 GPT Team 自定义数量输入
if not context.user_data.get("team_waiting_count"):
return # 不在等待状态,忽略消息
@@ -5092,6 +5845,48 @@ class ProvisionerBot:
reply_markup=reply_markup
)
async def _handle_run_custom_count_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /run 自定义数量输入"""
text = update.message.text.strip()
total_teams = len(TEAMS)
try:
count = int(text)
if count < 1 or count > total_teams:
await update.message.reply_text(
f"❌ 数量必须在 1-{total_teams} 之间\n\n"
"请重新使用 /run 开始"
)
return
except ValueError:
await update.message.reply_text(
"❌ 请输入有效的数字\n\n"
"请重新使用 /run 开始"
)
return
# 保存数量并显示邮箱选择
context.user_data["run_team_count"] = count
keyboard = [
[
InlineKeyboardButton("📧 GPTMail", callback_data=f"run:email:gptmail"),
InlineKeyboardButton("☁️ Cloud Mail", callback_data=f"run:email:cloudmail"),
],
[
InlineKeyboardButton("❌ 取消", callback_data="run:cancel"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"<b>🚀 启动处理 Team</b>\n\n"
f"已选择: <b>{count}</b> 个 Team\n\n"
f"请选择邮箱服务:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _handle_autogptplus_token_input(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 AutoGPTPlus Token 输入 - 保存后立即测试"""
import tomli_w