Files
AutoDoneTeam/tg_bot.py
2026-01-13 10:56:44 +08:00

674 lines
23 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Telegram Bot for OpenAI Account Registration
Features:
- /start - 开始使用机器人
- /register - 注册单个账号
- /batch <count> - 批量注册账号
- /payment - 完整注册(含支付)
- /help - 帮助信息
Registration Modes:
1. Basic - 注册 + 邮箱验证
2. Billing - 注册 + 邮箱验证 + 欧洲账单 URL
3. Payment - 注册 + 邮箱验证 + 账单 URL + SEPA 支付方式
"""
import os
import sys
import json
import asyncio
import secrets
import string
from typing import Optional
from datetime import datetime
# Telegram Bot
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
MessageHandler,
filters
)
# Local modules
from modules.tempmail import TempMailClient
from flow import CompleteRegistrationFlow
from config import TEMPMAIL_CONFIG, DEBUG
# Bot configuration
BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
ALLOWED_USER_IDS = os.getenv('ALLOWED_USER_IDS', '') # Comma-separated user IDs
# Parse allowed users
ALLOWED_USERS = set()
if ALLOWED_USER_IDS:
try:
ALLOWED_USERS = set(int(uid.strip()) for uid in ALLOWED_USER_IDS.split(',') if uid.strip())
except ValueError:
print("⚠️ Warning: Invalid ALLOWED_USER_IDS format. Bot will be public!")
def generate_random_password(length: int = 16) -> str:
"""生成随机密码"""
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*'
return ''.join(secrets.choice(chars) for _ in range(length))
def check_authorization(user_id: int) -> bool:
"""检查用户是否有权限使用 bot"""
if not ALLOWED_USERS:
return True # 如果未配置白名单,允许所有用户
return user_id in ALLOWED_USERS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /start 命令"""
user = update.effective_user
if not check_authorization(user.id):
await update.message.reply_text(
"❌ 你没有权限使用此机器人。\n"
"请联系管理员获取访问权限。"
)
return
keyboard = [
[InlineKeyboardButton("📝 注册单个账号", callback_data="register_single")],
[InlineKeyboardButton("📦 批量注册", callback_data="register_batch")],
[InlineKeyboardButton("💳 完整注册(含支付)", callback_data="register_payment")],
[InlineKeyboardButton("❓ 帮助", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"👋 你好 {user.first_name}!\n\n"
"🤖 OpenAI 账号注册机器人\n\n"
"功能:\n"
"• 自动注册 OpenAI 账号\n"
"• 自动验证邮箱\n"
"• 获取 Access Token\n"
"• 生成欧洲账单 URL (可选)\n"
"• 自动添加支付方式 SEPA (可选)\n\n"
"请选择操作:",
reply_markup=reply_markup
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /help 命令"""
help_text = """
📖 **使用帮助**
**命令列表:**
/start - 开始使用
/register - 注册单个账号
/batch <数量> - 批量注册 (例: /batch 5)
/payment - 完整注册(含支付)
/help - 显示此帮助
**注册模式:**
1⃣ **基础注册** - 仅注册账号和验证邮箱
2⃣ **账单注册** - 注册 + 生成欧洲账单 URL
3⃣ **完整注册** - 注册 + 账单 + 自动添加 SEPA 支付
**注册流程:**
1. 选择注册模式 (单个/批量/完整)
2. 选择是否生成账单 URL
3. 如需支付,选择地区信息(德国/美国/自定义)
4. 等待注册完成 (通常 30-90秒)
5. 接收账号信息
**账号信息包含:**
• 邮箱地址
• 密码
• Access Token
• 账单 URL (如已选择)
• IBAN (如已添加支付)
**支付说明:**
• 自动生成德国 IBAN符合 ISO 7064 标准)
• 使用 SEPA 支付方式
• 每个账号使用唯一 IBAN
• 支持自定义账单地址信息
**注意事项:**
• 账号密码会自动生成
• 邮箱使用临时邮箱服务
• 请保存好收到的账号信息
• 批量注册建议不超过 20 个
如有问题,请联系管理员。
"""
if update.message:
await update.message.reply_text(help_text, parse_mode='Markdown')
elif update.callback_query:
await update.callback_query.message.reply_text(help_text, parse_mode='Markdown')
async def register_single(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理单个账号注册"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 询问是否需要生成账单
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_single_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_single_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🔹 单个账号注册\n\n"
"是否需要生成欧洲账单 URL?",
reply_markup=reply_markup
)
async def batch_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理批量注册命令"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 解析数量
try:
count = int(context.args[0]) if context.args else 0
if count <= 0 or count > 20:
await update.message.reply_text(
"❌ 请提供有效的数量 (1-20)\n\n"
"示例: /batch 5"
)
return
# 保存数量到上下文
context.user_data['batch_count'] = count
# 询问是否需要生成账单
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_batch_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_batch_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"🔹 批量注册 {count} 个账号\n\n"
"是否需要生成欧洲账单 URL?",
reply_markup=reply_markup
)
except (IndexError, ValueError):
await update.message.reply_text(
"❌ 请提供有效的数量\n\n"
"示例: /batch 5"
)
async def payment_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理完整注册命令(含支付)"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 询问数量
keyboard = [
[InlineKeyboardButton("1⃣ 单个账号", callback_data="payment_count_1")],
[InlineKeyboardButton("5⃣ 5个账号", callback_data="payment_count_5")],
[InlineKeyboardButton("🔟 10个账号", callback_data="payment_count_10")],
[InlineKeyboardButton("📝 自定义数量", callback_data="payment_count_custom")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"💳 **完整注册(含支付)**\n\n"
"此模式将执行:\n"
"✅ 注册 OpenAI 账号\n"
"✅ 验证邮箱\n"
"✅ 生成欧洲账单 URL\n"
"✅ 自动添加 SEPA 支付方式\n"
"✅ 生成唯一德国 IBAN\n\n"
"请选择注册数量:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理按钮回调"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
if not check_authorization(user_id):
await query.message.reply_text("❌ 你没有权限使用此机器人。")
return
data = query.data
# Payment registration callbacks
if data == "register_payment":
keyboard = [
[InlineKeyboardButton("1⃣ 单个账号", callback_data="payment_count_1")],
[InlineKeyboardButton("5⃣ 5个账号", callback_data="payment_count_5")],
[InlineKeyboardButton("🔟 10个账号", callback_data="payment_count_10")],
[InlineKeyboardButton("📝 自定义数量", callback_data="payment_count_custom")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
"💳 **完整注册(含支付)**\n\n"
"此模式将执行:\n"
"✅ 注册 OpenAI 账号\n"
"✅ 验证邮箱\n"
"✅ 生成欧洲账单 URL\n"
"✅ 自动添加 SEPA 支付方式\n"
"✅ 生成唯一德国 IBAN\n\n"
"请选择注册数量:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_count_"):
# 处理数量选择
if data == "payment_count_custom":
await query.message.edit_text(
"📝 **自定义数量**\n\n"
"请发送数量 (1-20):\n"
"直接回复一个数字即可",
parse_mode='Markdown'
)
context.user_data['awaiting_payment_count'] = True
return
# 提取数量
count_map = {
"payment_count_1": 1,
"payment_count_5": 5,
"payment_count_10": 10
}
count = count_map.get(data, 1)
context.user_data['payment_count'] = count
# 询问地区
keyboard = [
[InlineKeyboardButton("🇩🇪 德国地址", callback_data="payment_region_de")],
[InlineKeyboardButton("🇺🇸 美国地址", callback_data="payment_region_us")],
[InlineKeyboardButton("🌍 使用默认", callback_data="payment_region_default")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
f"✅ **注册数量:{count} 个**\n\n"
"请选择账单地址信息:\n\n"
"🇩🇪 德国地址 - 使用德国信息\n"
"🇺🇸 美国地址 - 使用美国信息\n"
"🌍 使用默认 - 使用默认配置",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_region_"):
count = context.user_data.get('payment_count', 1)
# 显示开始信息
region_name = {
"payment_region_de": "🇩🇪 德国",
"payment_region_us": "🇺🇸 美国",
"payment_region_default": "🌍 默认"
}.get(data, "🌍 默认")
await query.message.edit_text(
f"⚙️ **配置完成**\n\n"
f"注册数量:{count}\n"
f"地址信息:{region_name}\n\n"
f"即将开始完整注册流程...",
parse_mode='Markdown'
)
# 根据地区设置默认信息
region_info = {
"payment_region_de": {
"name": "Hans Mueller",
"address_line1": "Hauptstraße 123",
"city": "Berlin",
"postal_code": "10115",
"state": "BE",
"country": "DE"
},
"payment_region_us": {
"name": "John Doe",
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
},
"payment_region_default": {
"name": "John Doe",
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
}
}
payment_info = region_info.get(data, region_info["payment_region_default"])
await perform_registration(query.message, count, generate_billing=True, add_payment=True, payment_info=payment_info)
# Original callbacks
elif data == "register_single":
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_single_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_single_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
"📝 **单个账号注册**\n\n"
"是否需要生成欧洲账单 URL?\n\n"
"✅ 生成账单 - 包含账单链接\n"
"❌ 不生成 - 仅注册和验证",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data == "register_batch":
await query.message.edit_text(
"📦 **批量注册**\n\n"
"请使用命令: `/batch <数量>`\n\n"
"示例: `/batch 5`",
parse_mode='Markdown'
)
elif data == "help":
await help_command(update, context)
elif data.startswith("reg_single_"):
generate_billing = "with_billing" in data
await perform_registration(query.message, 1, generate_billing)
elif data.startswith("reg_batch_"):
count = context.user_data.get('batch_count', 0)
if count <= 0:
await query.message.reply_text(
"❌ 请先使用 /batch <数量> 命令"
)
return
generate_billing = "with_billing" in data
await perform_registration(query.message, count, generate_billing)
async def perform_registration(message, count: int, generate_billing: bool, add_payment: bool = False, payment_info: dict = None):
"""执行注册流程(使用 CompleteRegistrationFlow"""
# 发送开始消息
status_text = (
f"🔄 开始注册 {count} 个账号...\n"
f"{'✅ 将生成账单 URL' if generate_billing else '❌ 不生成账单'}\n"
)
if add_payment:
status_text += "✅ 将添加支付方式\n"
status_text += "✅ 将保存到 Notion\n"
status_text += "\n⏳ 请稍候..."
status_msg = await message.reply_text(status_text)
# 初始化临时邮箱客户端
try:
api_base_url = TEMPMAIL_CONFIG.get('api_base_url')
username = TEMPMAIL_CONFIG.get('username')
password_cfg = TEMPMAIL_CONFIG.get('password')
admin_token = TEMPMAIL_CONFIG.get('admin_token')
if username and password_cfg:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
username=username,
password=password_cfg
)
elif admin_token:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
admin_token=admin_token
)
else:
await status_msg.edit_text("❌ 临时邮箱配置错误,请联系管理员")
return
except Exception as e:
await status_msg.edit_text(f"❌ 初始化失败: {str(e)}")
return
# 创建完整流程管理器
try:
flow = CompleteRegistrationFlow(tempmail_client)
if DEBUG:
print(f"✅ CompleteRegistrationFlow 初始化成功")
except Exception as e:
await status_msg.edit_text(f"❌ 流程管理器初始化失败: {str(e)}")
return
# 注册账号
success_accounts = []
failed_accounts = []
for i in range(1, count + 1):
try:
# 更新状态
progress_bar = "" * i + "" * (count - i)
await status_msg.edit_text(
f"🔄 **注册进度** [{i}/{count}]\n"
f"{progress_bar}\n\n"
f"✅ 成功: {len(success_accounts)}\n"
f"❌ 失败: {len(failed_accounts)}\n\n"
f"⏳ 正在处理第 {i} 个账号...",
parse_mode='Markdown'
)
# 生成密码
password = generate_random_password()
# 根据模式执行不同的注册流程
if add_payment:
# 完整流程:注册 + 账单 + 支付 + Notion
result = flow.register_with_payment(
password=password,
payment_info=payment_info,
save_to_notion=True
)
elif generate_billing:
# 账单流程:注册 + 账单
result = flow.register_with_billing(password=password)
else:
# 基础流程:仅注册
result = flow.register_basic(password=password)
if not result.get('success'):
failed_accounts.append({
'email': result.get('email', 'N/A'),
'error': result.get('error', 'Unknown error')
})
continue
success_accounts.append(result)
except Exception as e:
failed_accounts.append({
'email': 'N/A',
'error': str(e)
})
# 发送结果摘要
progress_bar = "" * count
await status_msg.edit_text(
f"🎉 **注册完成!** [{count}/{count}]\n"
f"{progress_bar}\n\n"
f"✅ 成功: **{len(success_accounts)}**\n"
f"❌ 失败: **{len(failed_accounts)}**\n\n"
f"📨 正在发送账号信息...",
parse_mode='Markdown'
)
# 发送每个成功的账号
for idx, acc in enumerate(success_accounts, 1):
account_text = (
f"╔═══════════════════\n"
f"║ 🎯 **账号 #{idx}**\n"
f"╚═══════════════════\n\n"
f"📧 **邮箱**\n"
f"`{acc['email']}`\n\n"
f"🔑 **密码**\n"
f"`{acc['password']}`\n"
)
if 'access_token' in acc:
account_text += f"\n🎫 **Access Token**\n`{acc['access_token'][:50]}...`\n"
if 'checkout_url' in acc:
account_text += f"\n💳 **账单链接**\n[点击打开支付页面]({acc['checkout_url']})\n"
elif 'billing_error' in acc:
account_text += f"\n⚠️ 账单: {acc['billing_error']}\n"
# 支付信息
if 'payment_added' in acc and acc['payment_added']:
account_text += f"\n✅ **支付方式**\nSEPA 已添加\n"
if 'iban' in acc:
account_text += f"\n🏦 **IBAN**\n`{acc['iban']}`\n"
elif 'payment_error' in acc:
account_text += f"\n⚠️ 支付: {acc['payment_error']}\n"
# Notion 保存状态
if 'notion_saved' in acc and acc['notion_saved']:
account_text += f"\n📝 **Notion**\n✅ 已保存到数据库\n"
elif 'notion_error' in acc:
account_text += f"\n⚠️ Notion: {acc['notion_error']}\n"
account_text += "\n━━━━━━━━━━━━━━━━━━━"
await message.reply_text(account_text, parse_mode='Markdown')
# 如果有失败的账号
if failed_accounts:
failed_text = "⚠️ **失败列表**\n\n"
for idx, acc in enumerate(failed_accounts, 1):
failed_text += f"`{idx}.` {acc['email']}\n{acc['error']}\n\n"
await message.reply_text(failed_text, parse_mode='Markdown')
# 更新最终状态消息
await status_msg.edit_text(
f"✅ **全部完成!**\n\n"
f"📊 **统计**\n"
f"• 总数: {count}\n"
f"• 成功: {len(success_accounts)}\n"
f"• 失败: {len(failed_accounts)}\n\n"
f"{'✅ 账号信息已发送' if success_accounts else '❌ 没有成功的账号'}",
parse_mode='Markdown'
)
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文本消息(用于自定义数量输入)"""
user_id = update.effective_user.id
if not check_authorization(user_id):
return
# 检查是否在等待支付数量输入
if context.user_data.get('awaiting_payment_count'):
try:
count = int(update.message.text.strip())
if count <= 0 or count > 20:
await update.message.reply_text(
"❌ 请提供有效的数量 (1-20)\n\n"
"请重新输入数字:"
)
return
# 清除等待标志
context.user_data['awaiting_payment_count'] = False
context.user_data['payment_count'] = count
# 询问地区
keyboard = [
[InlineKeyboardButton("🇩🇪 德国地址", callback_data="payment_region_de")],
[InlineKeyboardButton("🇺🇸 美国地址", callback_data="payment_region_us")],
[InlineKeyboardButton("🌍 使用默认", callback_data="payment_region_default")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ 将注册 {count} 个完整账号\n\n"
"请选择账单地址信息:",
reply_markup=reply_markup
)
except ValueError:
await update.message.reply_text(
"❌ 请输入有效的数字 (1-20)\n\n"
"请重新输入:"
)
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""全局错误处理"""
print(f"❌ Error: {context.error}")
if update and update.effective_message:
await update.effective_message.reply_text(
f"❌ 发生错误: {str(context.error)}\n\n"
"请稍后重试或联系管理员。"
)
def main():
"""启动 bot"""
if not BOT_TOKEN:
print("❌ Error: TELEGRAM_BOT_TOKEN not set!")
print("Please set environment variable: export TELEGRAM_BOT_TOKEN='your_token'")
sys.exit(1)
# 检查配置
api_base_url = TEMPMAIL_CONFIG.get('api_base_url')
if not api_base_url or 'your.tempmail.domain' in api_base_url:
print("❌ Error: TEMPMAIL_CONFIG not configured in config.py")
sys.exit(1)
print("🤖 Starting Telegram Bot...")
print(f"📋 Allowed Users: {len(ALLOWED_USERS) if ALLOWED_USERS else 'ALL (Public)'}")
# 创建应用
application = Application.builder().token(BOT_TOKEN).build()
# 添加处理器
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("register", register_single))
application.add_handler(CommandHandler("batch", batch_register))
application.add_handler(CommandHandler("payment", payment_register))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
# 错误处理
application.add_error_handler(error_handler)
# 启动 bot
print("✅ Bot started successfully!")
print("Press Ctrl+C to stop")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()