This commit is contained in:
2026-01-24 06:07:31 +08:00
parent 421caca1b7
commit 7c4688895e
3 changed files with 2238 additions and 1 deletions

1523
auto_gpt_team.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -220,3 +220,17 @@ notify_on_error = true
check_interval = 3600
# 低库存预警阈值 (正常账号数低于此值时预警)
low_stock_threshold = 10
# ==================== AutoGPTPlus 配置 ====================
# 独立的 ChatGPT 订阅自动化脚本配置
[autogptplus]
# Cloud Mail API Token
mail_api_token = "your-cloud-mail-token"
# Cloud Mail API 地址
mail_api_base = "https://your-cloud-mail.com"
# 可用邮箱域名列表
email_domains = ["@example.com", "@example.org"]
# SEPA IBAN 列表 (也可通过 Bot /iban_add 命令导入到 sepa_ibans.txt)
sepa_ibans = []
# 是否启用随机指纹 (User-Agent, WebGL, 分辨率等)
random_fingerprint = true

View File

@@ -3,6 +3,7 @@
import asyncio
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import wraps
from pathlib import Path
@@ -130,6 +131,15 @@ class ProvisionerBot:
("gptmail_keys", self.cmd_gptmail_keys),
("gptmail_add", self.cmd_gptmail_add),
("gptmail_del", self.cmd_gptmail_del),
("iban_list", self.cmd_iban_list),
("iban_add", self.cmd_iban_add),
("iban_clear", self.cmd_iban_clear),
("domain_list", self.cmd_domain_list),
("domain_add", self.cmd_domain_add),
("domain_del", self.cmd_domain_del),
("domain_clear", self.cmd_domain_clear),
("team_fingerprint", self.cmd_team_fingerprint),
("team_register", self.cmd_team_register),
("test_email", self.cmd_test_email),
("include_owners", self.cmd_include_owners),
("reload", self.cmd_reload),
@@ -161,6 +171,16 @@ class ProvisionerBot:
self.callback_clean_teams,
pattern="^clean_teams:"
))
self.app.add_handler(CallbackQueryHandler(
self.callback_team_register,
pattern="^team_reg:"
))
# 注册自定义数量输入处理器 (GPT Team 注册)
self.app.add_handler(MessageHandler(
filters.TEXT & ~filters.COMMAND,
self.handle_team_custom_count
))
# 注册定时检查任务
if TELEGRAM_CHECK_INTERVAL > 0 and AUTH_PROVIDER == "s2a":
@@ -285,10 +305,27 @@ class ProvisionerBot:
/gptmail_del <key> - 删除 API Key
/test_email - 测试邮箱创建
<b>💳 IBAN 管理 (GPT Team):</b>
/iban_list - 查看 IBAN 列表
/iban_add &lt;ibans&gt; - 添加 IBAN (每行一个或逗号分隔)
/iban_clear - 清空 IBAN 列表
<b>📧 域名管理 (GPT Team):</b>
/domain_list - 查看邮箱域名列表
/domain_add &lt;domains&gt; - 添加域名 (每行一个或逗号分隔)
/domain_del &lt;domain&gt; - 删除指定域名
/domain_clear - 清空域名列表
<b>🤖 GPT Team:</b>
/team_fingerprint - 开启/关闭随机指纹
/team_register - 开始自动订阅注册
<b>💡 示例:</b>
<code>/list</code> - 查看所有待处理账号
<code>/run 0</code> - 处理第一个 Team
<code>/gptmail_add my-api-key</code> - 添加 Key"""
<code>/gptmail_add my-api-key</code> - 添加 Key
<code>/iban_add DE123...,DE456...</code> - 添加 IBAN
<code>/domain_add @example.com</code> - 添加域名"""
await update.message.reply_text(help_text, parse_mode="HTML")
@admin_only
@@ -2338,6 +2375,669 @@ class ProvisionerBot:
except Exception as e:
await update.message.reply_text(f"❌ 测试失败: {e}")
@admin_only
async def cmd_iban_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看 IBAN 列表"""
try:
from auto_gpt_team import get_sepa_ibans
ibans = get_sepa_ibans()
if not ibans:
await update.message.reply_text(
"<b>💳 SEPA IBAN 列表</b>\n\n"
"📭 暂无 IBAN\n\n"
"使用 /iban_add 添加 IBAN",
parse_mode="HTML"
)
return
# 显示 IBAN 列表
lines = [f"<b>💳 SEPA IBAN 列表 ({len(ibans)} 个)</b>\n"]
for i, iban in enumerate(ibans[:50], 1): # 最多显示 50 个
lines.append(f"{i}. <code>{iban}</code>")
if len(ibans) > 50:
lines.append(f"\n... 还有 {len(ibans) - 50} 个未显示")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 获取 IBAN 列表失败: {e}")
@admin_only
async def cmd_iban_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加 IBAN"""
if not context.args:
await update.message.reply_text(
"<b>💳 添加 IBAN</b>\n\n"
"用法:\n"
"<code>/iban_add DE123... DE456...</code>\n"
"<code>/iban_add DE123...,DE456...</code>\n\n"
"支持空格或逗号分隔,每行一个也可以",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import add_sepa_ibans
# 解析输入 (支持空格、逗号、换行分隔)
raw_input = " ".join(context.args)
# 替换逗号和换行为空格,然后按空格分割
ibans = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()]
if not ibans:
await update.message.reply_text("❌ 未提供有效的 IBAN")
return
added, skipped, total = add_sepa_ibans(ibans)
await update.message.reply_text(
f"<b>✅ IBAN 导入完成</b>\n\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"当前总数: {total}",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 添加 IBAN 失败: {e}")
@admin_only
async def cmd_iban_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清空 IBAN 列表"""
# 需要确认
if not context.args or context.args[0].lower() != "confirm":
try:
from auto_gpt_team import get_sepa_ibans
count = len(get_sepa_ibans())
except:
count = 0
await update.message.reply_text(
f"<b>⚠️ 确认清空 IBAN 列表?</b>\n\n"
f"当前共有 {count} 个 IBAN\n\n"
f"确认请发送:\n"
f"<code>/iban_clear confirm</code>",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import clear_sepa_ibans
clear_sepa_ibans()
await update.message.reply_text("<b>✅ IBAN 列表已清空</b>", parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 清空 IBAN 失败: {e}")
@admin_only
async def cmd_domain_list(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""查看邮箱域名列表"""
try:
from auto_gpt_team import get_email_domains
domains = get_email_domains()
if not domains:
await update.message.reply_text(
"<b>📧 邮箱域名列表</b>\n\n"
"📭 暂无域名\n\n"
"使用 /domain_add 添加域名",
parse_mode="HTML"
)
return
# 显示域名列表
lines = [f"<b>📧 邮箱域名列表 ({len(domains)} 个)</b>\n"]
for i, domain in enumerate(domains[:50], 1): # 最多显示 50 个
lines.append(f"{i}. <code>{domain}</code>")
if len(domains) > 50:
lines.append(f"\n... 还有 {len(domains) - 50} 个未显示")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 获取域名列表失败: {e}")
@admin_only
async def cmd_domain_add(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""添加邮箱域名"""
if not context.args:
await update.message.reply_text(
"<b>📧 添加邮箱域名</b>\n\n"
"用法:\n"
"<code>/domain_add @example.com</code>\n"
"<code>/domain_add @a.com,@b.com</code>\n\n"
"支持空格或逗号分隔\n"
"@ 符号可省略,会自动添加",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import add_email_domains
# 解析输入 (支持空格、逗号、换行分隔)
raw_input = " ".join(context.args)
# 替换逗号和换行为空格,然后按空格分割
domains = [s.strip() for s in raw_input.replace(",", " ").replace("\n", " ").split() if s.strip()]
if not domains:
await update.message.reply_text("❌ 未提供有效的域名")
return
added, skipped, total = add_email_domains(domains)
await update.message.reply_text(
f"<b>✅ 域名导入完成</b>\n\n"
f"新增: {added}\n"
f"跳过 (重复): {skipped}\n"
f"当前总数: {total}",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 添加域名失败: {e}")
@admin_only
async def cmd_domain_del(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""删除指定域名"""
if not context.args:
await update.message.reply_text(
"<b>📧 删除域名</b>\n\n"
"用法:\n"
"<code>/domain_del @example.com</code>\n\n"
"@ 符号可省略",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import remove_email_domain, get_email_domains
domain = context.args[0].strip()
if remove_email_domain(domain):
total = len(get_email_domains())
await update.message.reply_text(
f"<b>✅ 域名已删除</b>\n\n"
f"已删除: <code>{domain}</code>\n"
f"剩余: {total}",
parse_mode="HTML"
)
else:
await update.message.reply_text(
f"❌ 域名不存在: <code>{domain}</code>",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 删除域名失败: {e}")
@admin_only
async def cmd_domain_clear(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""清空域名列表"""
# 需要确认
if not context.args or context.args[0].lower() != "confirm":
try:
from auto_gpt_team import get_email_domains
count = len(get_email_domains())
except:
count = 0
await update.message.reply_text(
f"<b>⚠️ 确认清空域名列表?</b>\n\n"
f"当前共有 {count} 个域名\n\n"
f"确认请发送:\n"
f"<code>/domain_clear confirm</code>",
parse_mode="HTML"
)
return
try:
from auto_gpt_team import clear_email_domains
clear_email_domains()
await update.message.reply_text("<b>✅ 域名列表已清空</b>", parse_mode="HTML")
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
except Exception as e:
await update.message.reply_text(f"❌ 清空域名失败: {e}")
@admin_only
async def cmd_team_fingerprint(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""切换 GPT Team 随机指纹"""
import tomli_w
try:
# 读取当前配置
with open(CONFIG_FILE, "rb") as f:
import tomllib
config = tomllib.load(f)
# 确保 GPT Team section 存在
if "GPT Team" not in config:
config["GPT Team"] = {}
# 获取当前状态
current = config.get("GPT Team", {}).get("random_fingerprint", True)
new_value = not current
# 更新配置
config["GPT Team"]["random_fingerprint"] = new_value
# 写回文件
with open(CONFIG_FILE, "wb") as f:
tomli_w.dump(config, f)
status = "✅ 已开启" if new_value else "❌ 已关闭"
await update.message.reply_text(
f"<b>🎭 GPT Team 随机指纹</b>\n\n"
f"状态: {status}\n\n"
f"开启后每次运行将随机使用不同的:\n"
f"• User-Agent (Chrome 139-144)\n"
f"• WebGL 显卡指纹 (NVIDIA/AMD/Intel)\n"
f"• 屏幕分辨率 (1080p/1440p/4K)\n\n"
f"💡 下次运行 auto_gpt_team.py 时生效",
parse_mode="HTML"
)
except ImportError:
await update.message.reply_text(
"❌ 缺少 tomli_w 依赖\n"
"请运行: uv add tomli_w"
)
except Exception as e:
await update.message.reply_text(f"❌ 修改配置失败: {e}")
@admin_only
async def cmd_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""开始 GPT Team 自动订阅注册"""
# 检查是否有任务正在运行
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 有任务正在运行: {self.current_team}\n"
"请等待任务完成或使用 /stop 停止后再开始注册"
)
return
# 检查配置
try:
from auto_gpt_team import MAIL_API_TOKEN, MAIL_API_BASE, get_email_domains, get_sepa_ibans
if not MAIL_API_TOKEN or not MAIL_API_BASE:
await update.message.reply_text(
"<b>❌ 配置错误</b>\n\n"
"请在 config.toml 中配置 [GPT Team] 段:\n"
"• mail_api_token\n"
"• mail_api_base",
parse_mode="HTML"
)
return
domains = get_email_domains()
if not domains:
await update.message.reply_text(
"<b>❌ 没有可用的邮箱域名</b>\n\n"
"请先使用 /domain_add 导入域名",
parse_mode="HTML"
)
return
ibans = get_sepa_ibans()
if not ibans:
await update.message.reply_text(
"<b>❌ 没有可用的 IBAN</b>\n\n"
"请先使用 /iban_add 导入 IBAN",
parse_mode="HTML"
)
return
except ImportError:
await update.message.reply_text("❌ auto_gpt_team 模块未找到")
return
# 显示数量选择
keyboard = [
[
InlineKeyboardButton("1 个", callback_data="team_reg:count:1"),
InlineKeyboardButton("3 个", callback_data="team_reg:count:3"),
InlineKeyboardButton("5 个", callback_data="team_reg:count:5"),
],
[
InlineKeyboardButton("10 个", callback_data="team_reg:count:10"),
InlineKeyboardButton("20 个", callback_data="team_reg:count:20"),
InlineKeyboardButton("自定义", callback_data="team_reg:count:custom"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"<b>🚀 GPT Team 自动订阅</b>\n\n"
f"📧 邮箱域名: {len(domains)}\n"
f"💳 可用 IBAN: {len(ibans)}\n\n"
"请选择注册数量:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def callback_team_register(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 GPT Team 注册回调"""
query = update.callback_query
# 权限检查
user_id = update.effective_user.id
if user_id not in TELEGRAM_ADMIN_CHAT_IDS:
await query.answer("⛔ 无权限", show_alert=True)
return
await query.answer()
data = query.data.split(":")
action = data[1] if len(data) > 1 else ""
value = data[2] if len(data) > 2 else ""
if action == "count":
if value == "custom":
await query.edit_message_text(
"<b>📝 自定义数量</b>\n\n"
"请发送数量 (1-50):\n"
"直接回复一个数字即可\n\n"
"例如: <code>20</code>",
parse_mode="HTML"
)
# 设置等待输入状态
context.user_data["team_waiting_count"] = True
return
count = int(value)
# 显示输出方式选择
keyboard = [
[
InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"),
],
[
InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
f"<b>⚙️ 配置完成</b>\n\n"
f"注册数量: {count}\n\n"
f"请选择输出方式:",
parse_mode="HTML",
reply_markup=reply_markup
)
elif action == "output":
output_type = value # json 或 team
count = int(data[3]) if len(data) > 3 else 1
await query.edit_message_text(
f"<b>⚙️ 配置完成</b>\n\n"
f"注册数量: {count}\n"
f"输出方式: {'📄 JSON 文件' if output_type == 'json' else '📥 team.json'}\n\n"
f"即将开始完整注册流程...",
parse_mode="HTML"
)
# 开始注册任务
self.current_team = f"GPT Team 注册 ({count}个)"
self.current_task = asyncio.create_task(
self._run_team_registration(query.message.chat_id, count, output_type)
)
async def _run_team_registration(self, chat_id: int, count: int, output_type: str):
"""执行 GPT Team 注册任务"""
from auto_gpt_team import run_single_registration, cleanup_chrome_processes
import json
import threading
results = []
success_count = 0
fail_count = 0
# 当前步骤 (用于显示)
current_step = ["初始化..."]
current_account = [""]
step_lock = threading.Lock()
def step_callback(step: str):
"""步骤回调 - 更新当前步骤"""
with step_lock:
current_step[0] = step
# 发送开始消息
progress_msg = await self.app.bot.send_message(
chat_id,
f"<b>🚀 开始注册</b>\n\n"
f"进度: 0/{count}\n"
f"{'' * 20}",
parse_mode="HTML"
)
# 进度更新任务
async def update_progress_loop():
"""定期更新进度消息"""
last_step = ""
while True:
await asyncio.sleep(1.5) # 每 1.5 秒更新一次
try:
with step_lock:
step = current_step[0]
account = current_account[0]
# 只有步骤变化时才更新
if step != last_step:
last_step = step
progress = int((success_count + fail_count) / count * 20) if count > 0 else 0
progress_bar = '' * progress + '' * (20 - progress)
text = (
f"<b>🚀 注册中...</b>\n\n"
f"进度: {success_count + fail_count}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}\n"
)
if account:
text += f"\n⏳ 账号: <code>{account[:20]}...</code>"
if step:
text += f"\n{step}"
try:
await progress_msg.edit_text(text, parse_mode="HTML")
except:
pass
except asyncio.CancelledError:
break
except:
pass
# 启动进度更新任务
progress_task = asyncio.create_task(update_progress_loop())
for i in range(count):
# 检查停止请求
try:
import run
if run._shutdown_requested:
break
except:
pass
# 执行注册
try:
# 使用 functools.partial 传递回调
import functools
def run_with_callback():
return run_single_registration(
progress_callback=None,
step_callback=step_callback
)
# 更新当前账号
with step_lock:
current_step[0] = "生成账号信息..."
current_account[0] = f"{i+1}"
result = await asyncio.get_event_loop().run_in_executor(
self.executor,
run_with_callback
)
if result.get("success"):
success_count += 1
results.append({
"account": result["account"],
"password": result["password"],
"token": result["token"],
"account_id": result.get("account_id", "")
})
with step_lock:
current_account[0] = result["account"]
else:
fail_count += 1
log.warning(f"注册失败: {result.get('error', '未知错误')}")
except Exception as e:
fail_count += 1
log.error(f"注册异常: {e}")
# 清理浏览器进程
cleanup_chrome_processes()
# 停止进度更新任务
progress_task.cancel()
try:
await progress_task
except asyncio.CancelledError:
pass
# 完成进度
progress_bar = '' * 20
await progress_msg.edit_text(
f"<b>🎉 注册完成!</b> {success_count}/{count}\n"
f"{progress_bar}\n\n"
f"✅ 成功: {success_count}\n"
f"❌ 失败: {fail_count}",
parse_mode="HTML"
)
# 处理结果
if results:
if output_type == "json":
# 生成 JSON 文件
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"team_accounts_{timestamp}.json"
filepath = Path(filename)
with open(filepath, "w", encoding="utf-8") as f:
json.dump(results, f, ensure_ascii=False, indent=2)
# 发送文件
await self.app.bot.send_document(
chat_id,
document=open(filepath, "rb"),
filename=filename,
caption=f"📄 注册结果 ({success_count} 个账号)"
)
# 删除临时文件
filepath.unlink()
elif output_type == "team":
# 添加到 team.json
try:
team_file = TEAM_JSON_FILE
existing = []
if team_file.exists():
with open(team_file, "r", encoding="utf-8") as f:
existing = json.load(f)
existing.extend(results)
with open(team_file, "w", encoding="utf-8") as f:
json.dump(existing, f, ensure_ascii=False, indent=2)
# 重载配置
reload_config()
await self.app.bot.send_message(
chat_id,
f"<b>✅ 已添加到 team.json</b>\n\n"
f"新增: {success_count} 个账号\n"
f"当前总数: {len(existing)}",
parse_mode="HTML"
)
except Exception as e:
await self.app.bot.send_message(
chat_id,
f"❌ 保存到 team.json 失败: {e}"
)
self.current_task = None
self.current_team = None
@admin_only
async def handle_team_custom_count(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 GPT Team 自定义数量输入"""
# 检查是否在等待输入状态
if not context.user_data.get("team_waiting_count"):
return # 不在等待状态,忽略消息
# 清除等待状态
context.user_data["team_waiting_count"] = False
text = update.message.text.strip()
# 验证输入
try:
count = int(text)
if count < 1 or count > 50:
await update.message.reply_text(
"❌ 数量必须在 1-50 之间\n\n"
"请重新使用 /team_register 开始"
)
return
except ValueError:
await update.message.reply_text(
"❌ 请输入有效的数字\n\n"
"请重新使用 /team_register 开始"
)
return
# 显示输出方式选择
keyboard = [
[
InlineKeyboardButton("📄 JSON 文件", callback_data=f"team_reg:output:json:{count}"),
],
[
InlineKeyboardButton("📥 添加到 team.json", callback_data=f"team_reg:output:team:{count}"),
],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"<b>⚙️ 配置完成</b>\n\n"
f"注册数量: {count}\n\n"
f"请选择输出方式:",
parse_mode="HTML",
reply_markup=reply_markup
)
async def main():
"""主函数"""