修改机器人,优化用户体验

This commit is contained in:
dela
2026-01-26 23:43:31 +08:00
parent 8756bce4f8
commit 8066771856
8 changed files with 1522 additions and 737 deletions

View File

@@ -47,8 +47,8 @@ POW_CONFIG = {
# Sentinel 求解器配置 # Sentinel 求解器配置
SENTINEL_CONFIG = { SENTINEL_CONFIG = {
# 使用纯 Python 实现(不需要 Node.js # 使用纯 Python 原生实现(默认,推荐
# True: 使用 sentinel_native.py推荐,无外部依赖 # True: 使用 sentinel_native.py无需 Node.js推荐)
# False: 使用 js_executor.py + sdk.js需要 Node.js # False: 使用 js_executor.py + sdk.js需要 Node.js
'use_native': True, 'use_native': True,
} }

View File

@@ -29,7 +29,7 @@ try:
from modules.fingerprint import BrowserFingerprint from modules.fingerprint import BrowserFingerprint
from modules.http_client import HTTPClient from modules.http_client import HTTPClient
from modules.sentinel_solver import SentinelSolver from modules.sentinel_solver import SentinelSolver
from modules.pow_solver import ProofOfWorkSolver from modules.sentinel_native import PowSolver
MODULES_AVAILABLE = True MODULES_AVAILABLE = True
print("✅ Project modules loaded") print("✅ Project modules loaded")
except ImportError as e: except ImportError as e:
@@ -57,7 +57,7 @@ class OpenAILogin:
self.session = self.http_client.session # 保持兼容性 self.session = self.http_client.session # 保持兼容性
use_native = SENTINEL_CONFIG.get('use_native', True) use_native = SENTINEL_CONFIG.get('use_native', True)
self.sentinel_solver = SentinelSolver(self.fingerprint, use_native=use_native) self.sentinel_solver = SentinelSolver(self.fingerprint, use_native=use_native)
self.pow_solver = ProofOfWorkSolver() self.pow_solver = PowSolver()
print(f"✅ Using HTTPClient with project modules (native={use_native})") print(f"✅ Using HTTPClient with project modules (native={use_native})")
else: else:
# 降级使用原始 session # 降级使用原始 session

View File

@@ -1,13 +1,15 @@
# modules/__init__.py # modules/__init__.py
"""Sentinel Bypass 模块包""" """Sentinel Bypass 模块包
默认使用纯 Python 原生实现(无需 Node.js
"""
from .fingerprint import BrowserFingerprint from .fingerprint import BrowserFingerprint
from .js_executor import JSExecutor
from .sentinel_solver import SentinelSolver from .sentinel_solver import SentinelSolver
from .http_client import HTTPClient from .http_client import HTTPClient
from .register import OpenAIRegistrar from .register import OpenAIRegistrar
# 纯 Python 实现(不依赖 Node.js # 纯 Python 实现(默认推荐
from .sentinel_native import ( from .sentinel_native import (
NativeSentinelSolver, NativeSentinelSolver,
PowSolver, PowSolver,
@@ -18,14 +20,20 @@ from .sentinel_native import (
generate_requirements_token_simple, generate_requirements_token_simple,
) )
# Node.js 实现(可选,需要安装 Node.js
try:
from .js_executor import JSExecutor
_HAS_JS_EXECUTOR = True
except ImportError:
_HAS_JS_EXECUTOR = False
__all__ = [ __all__ = [
# 原有组件(依赖 Node.js # 核心组件
'BrowserFingerprint', 'BrowserFingerprint',
'JSExecutor',
'SentinelSolver', 'SentinelSolver',
'HTTPClient', 'HTTPClient',
'OpenAIRegistrar', 'OpenAIRegistrar',
# 纯 Python 组件 # 纯 Python 组件(推荐)
'NativeSentinelSolver', 'NativeSentinelSolver',
'PowSolver', 'PowSolver',
'TurnstileSolver', 'TurnstileSolver',
@@ -34,3 +42,7 @@ __all__ = [
'solve_pow_simple', 'solve_pow_simple',
'generate_requirements_token_simple', 'generate_requirements_token_simple',
] ]
# 仅当 Node.js 可用时导出 JSExecutor
if _HAS_JS_EXECUTOR:
__all__.append('JSExecutor')

View File

@@ -1,114 +0,0 @@
import time
import json
import base64
from typing import List
DEBUG = True
class ProofOfWorkSolver:
"""解决 OpenAI Sentinel 的 Proof of Work challenge"""
def __init__(self):
# FNV-1a 常量
self.FNV_OFFSET = 2166136261
self.FNV_PRIME = 16777619
def fnv1a_hash(self, data: str) -> str:
"""FNV-1a hash 算法"""
hash_value = self.FNV_OFFSET
for char in data:
hash_value ^= ord(char)
hash_value = (hash_value * self.FNV_PRIME) & 0xFFFFFFFF
# 额外的混合步骤(从 JS 代码复制)
hash_value ^= hash_value >> 16
hash_value = (hash_value * 2246822507) & 0xFFFFFFFF
hash_value ^= hash_value >> 13
hash_value = (hash_value * 3266489909) & 0xFFFFFFFF
hash_value ^= hash_value >> 16
# 转为 8 位十六进制字符串
return format(hash_value, '08x')
def serialize_array(self, arr: List) -> str:
"""模拟 JS 的 T() 函数JSON.stringify + Base64"""
json_str = json.dumps(arr, separators=(',', ':'))
return base64.b64encode(json_str.encode()).decode()
def build_fingerprint_array(self, nonce: int, elapsed_ms: int) -> List:
"""构建指纹数组(简化版)"""
return [
0, # [0] screen dimensions
"", # [1] timestamp
0, # [2] memory
nonce, # [3] nonce ← 关键
"", # [4] user agent
"", # [5] random element
"", # [6] script src
"", # [7] language
"", # [8] languages
elapsed_ms, # [9] elapsed time ← 关键
"", # [10] random function
"", # [11] keys
"", # [12] window keys
0, # [13] performance.now()
"", # [14] uuid
"", # [15] URL params
0, # [16] hardware concurrency
0 # [17] timeOrigin
]
def solve(self, seed: str, difficulty: str, max_iterations: int = 10000000) -> str:
"""
解决 PoW challenge
Args:
seed: Challenge seed
difficulty: 目标难度(十六进制字符串)
max_iterations: 最大尝试次数
Returns:
序列化的答案(包含 nonce
"""
if DEBUG:
print(f"[PoW] Solving challenge:")
print(f" Seed: {seed}")
print(f" Difficulty: {difficulty}")
start_time = time.time()
for nonce in range(max_iterations):
elapsed_ms = int((time.time() - start_time) * 1000)
# 构建指纹数组
fingerprint = self.build_fingerprint_array(nonce, elapsed_ms)
# 序列化
serialized = self.serialize_array(fingerprint)
# 计算 hash(seed + serialized)
hash_input = seed + serialized
hash_result = self.fnv1a_hash(hash_input)
# 检查是否满足难度要求
# 比较方式hash 的前 N 位(作为整数)<= difficulty作为整数
difficulty_len = len(difficulty)
hash_prefix = hash_result[:difficulty_len]
if hash_prefix <= difficulty:
elapsed = time.time() - start_time
if DEBUG:
print(f"[PoW] ✓ Found solution in {elapsed:.2f}s")
print(f" Nonce: {nonce}")
print(f" Hash: {hash_result}")
print(f" Serialized: {serialized[:100]}...")
# 返回 serialized + "~S" (表示成功)
return serialized + "~S"
# 每 100k 次迭代打印进度
if DEBUG and nonce > 0 and nonce % 100000 == 0:
print(f"[PoW] Tried {nonce:,} iterations...")
raise Exception(f"Failed to solve PoW after {max_iterations:,} iterations")

View File

@@ -13,7 +13,7 @@ from .sentinel_solver import SentinelSolver
from .http_client import HTTPClient from .http_client import HTTPClient
from .stripe_payment import StripePaymentHandler from .stripe_payment import StripePaymentHandler
from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG from config import AUTH_BASE_URL, DEBUG, TEMPMAIL_CONFIG
from modules.pow_solver import ProofOfWorkSolver from .sentinel_native import PowSolver
from modules.tempmail import TempMailClient from modules.tempmail import TempMailClient
# 导入 Sentinel 配置 # 导入 Sentinel 配置
@@ -34,7 +34,7 @@ class OpenAIRegistrar:
self.solver = SentinelSolver(self.fingerprint, use_native=use_native) self.solver = SentinelSolver(self.fingerprint, use_native=use_native)
self.http_client = HTTPClient(self.fingerprint) self.http_client = HTTPClient(self.fingerprint)
self.pow_solver = ProofOfWorkSolver() # 新增 self.pow_solver = PowSolver() # 使用 sentinel_native 中的 PowSolver
self.tempmail_client = tempmail_client # 临时邮箱客户端(可选) self.tempmail_client = tempmail_client # 临时邮箱客户端(可选)
def _step1_init_through_chatgpt(self, email: str): def _step1_init_through_chatgpt(self, email: str):
@@ -310,8 +310,11 @@ class OpenAIRegistrar:
if not seed or not difficulty: if not seed or not difficulty:
raise Exception(f"Missing PoW parameters") raise Exception(f"Missing PoW parameters")
# 解 PoW # 获取指纹配置数组
self.pow_answer = self.pow_solver.solve(seed, difficulty) config = self.fingerprint.get_config_array()
# 解 PoW (静态方法,需要传入 config)
self.pow_answer = PowSolver.solve(seed, difficulty, config)
if DEBUG: if DEBUG:
print(f"✅ [2.6] PoW solved (difficulty: {difficulty})") print(f"✅ [2.6] PoW solved (difficulty: {difficulty})")

View File

@@ -1,9 +1,8 @@
# modules/sentinel_solver.py # modules/sentinel_solver.py
"""Sentinel 挑战求解器 """Sentinel 挑战求解器
支持两种模式: 默认使用纯 Python 原生实现(无需 Node.js
1. Node.js 模式 (默认): 使用 sdk.js 执行 如需使用 Node.js 模式,设置 use_native=False
2. Native 模式: 纯 Python 实现,无需 Node.js
""" """
import json import json
@@ -13,30 +12,30 @@ from typing import Dict, Optional
from .fingerprint import BrowserFingerprint from .fingerprint import BrowserFingerprint
from config import DEBUG from config import DEBUG
# 尝试导入 JSExecutor需要 Node.js # 优先导入纯 Python 实现(推荐
try:
from .js_executor import JSExecutor
HAS_JS_EXECUTOR = True
except Exception:
HAS_JS_EXECUTOR = False
# 尝试导入纯 Python 实现
try: try:
from .sentinel_native import NativeSentinelSolver, PowSolver from .sentinel_native import NativeSentinelSolver, PowSolver
HAS_NATIVE_SOLVER = True HAS_NATIVE_SOLVER = True
except ImportError: except ImportError:
HAS_NATIVE_SOLVER = False HAS_NATIVE_SOLVER = False
# 尝试导入 JSExecutor需要 Node.js作为备选
try:
from .js_executor import JSExecutor
HAS_JS_EXECUTOR = True
except Exception:
HAS_JS_EXECUTOR = False
class SentinelSolver: class SentinelSolver:
"""协调指纹生成和求解器,生成完整的 Sentinel tokens """协调指纹生成和求解器,生成完整的 Sentinel tokens
支持两种模式: 支持两种模式:
- use_native=False (默认): 使用 Node.js + sdk.js - use_native=True (默认): 使用纯 Python 实现,无需 Node.js
- use_native=True: 使用纯 Python 实现 - use_native=False: 使用 Node.js + sdk.js需安装 Node.js
""" """
def __init__(self, fingerprint: BrowserFingerprint, use_native: bool = False): def __init__(self, fingerprint: BrowserFingerprint, use_native: bool = True):
self.fingerprint = fingerprint self.fingerprint = fingerprint
self.use_native = use_native self.use_native = use_native

1291
tg_bot.py

File diff suppressed because it is too large Load Diff

784
tg_bot_old.py Normal file
View File

@@ -0,0 +1,784 @@
#!/usr/bin/env python3
"""Telegram Bot for OpenAI Account Registration
Features:
- /start - 开始使用机器人
- /register - 注册单个账号
- /batch <count> - 批量注册账号
- /payment - 完整注册(含支付)
- /help - 帮助信息
Registration Modes:
1. Basic - 注册 + 邮箱验证
2. Billing - 注册 + 邮箱验证 + 欧洲账单 URL
3. Payment - 注册 + 邮箱验证 + 账单 URL + SEPA 支付方式
"""
import os
import sys
import json
import asyncio
import secrets
import string
from typing import Optional
from datetime import datetime
# Telegram Bot
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CommandHandler,
CallbackQueryHandler,
ContextTypes,
MessageHandler,
filters
)
# Local modules
from modules.tempmail import TempMailClient
from flow import CompleteRegistrationFlow
from config import TEMPMAIL_CONFIG, DEBUG
# Bot configuration
BOT_TOKEN = os.getenv('TELEGRAM_BOT_TOKEN', '')
ALLOWED_USER_IDS = os.getenv('ALLOWED_USER_IDS', '') # Comma-separated user IDs
# Parse allowed users
ALLOWED_USERS = set()
if ALLOWED_USER_IDS:
try:
ALLOWED_USERS = set(int(uid.strip()) for uid in ALLOWED_USER_IDS.split(',') if uid.strip())
except ValueError:
print("⚠️ Warning: Invalid ALLOWED_USER_IDS format. Bot will be public!")
def generate_random_password(length: int = 16) -> str:
"""生成随机密码"""
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*'
return ''.join(secrets.choice(chars) for _ in range(length))
def check_authorization(user_id: int) -> bool:
"""检查用户是否有权限使用 bot"""
if not ALLOWED_USERS:
return True # 如果未配置白名单,允许所有用户
return user_id in ALLOWED_USERS
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /start 命令"""
user = update.effective_user
if not check_authorization(user.id):
await update.message.reply_text(
"❌ 你没有权限使用此机器人。\n"
"请联系管理员获取访问权限。"
)
return
keyboard = [
[InlineKeyboardButton("📝 注册单个账号", callback_data="register_single")],
[InlineKeyboardButton("📦 批量注册", callback_data="register_batch")],
[InlineKeyboardButton("💳 完整注册(含支付)", callback_data="register_payment")],
[InlineKeyboardButton("❓ 帮助", callback_data="help")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"👋 你好 {user.first_name}!\n\n"
"🤖 OpenAI 账号注册机器人\n\n"
"功能:\n"
"• 自动注册 OpenAI 账号\n"
"• 自动验证邮箱\n"
"• 获取 Access Token\n"
"• 生成欧洲账单 URL (可选)\n"
"• 自动添加支付方式 SEPA (可选)\n\n"
"请选择操作:",
reply_markup=reply_markup
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理 /help 命令"""
help_text = """
📖 **使用帮助**
**命令列表:**
/start - 开始使用
/register - 注册单个账号
/batch <数量> - 批量注册 (例: /batch 5)
/payment - 完整注册(含支付)
/help - 显示此帮助
**注册模式:**
1⃣ **基础注册** - 仅注册账号和验证邮箱
2⃣ **账单注册** - 注册 + 生成欧洲账单 URL
3⃣ **完整注册** - 注册 + 账单 + 自动添加 SEPA 支付
**注册流程:**
1. 选择注册模式 (单个/批量/完整)
2. 选择是否生成账单 URL
3. 如需支付,选择地区信息(德国/美国/自定义)
4. 等待注册完成 (通常 30-90秒)
5. 接收账号信息
**账号信息包含:**
• 邮箱地址
• 密码
• Access Token
• 账单 URL (如已选择)
• IBAN (如已添加支付)
**支付说明:**
• 自动生成德国 IBAN符合 ISO 7064 标准)
• 使用 SEPA 支付方式
• 每个账号使用唯一 IBAN
• 支持自定义账单地址信息
**注意事项:**
• 账号密码会自动生成
• 邮箱使用临时邮箱服务
• 请保存好收到的账号信息
• 批量注册建议不超过 20 个
如有问题,请联系管理员。
"""
if update.message:
await update.message.reply_text(help_text, parse_mode='Markdown')
elif update.callback_query:
await update.callback_query.message.reply_text(help_text, parse_mode='Markdown')
async def register_single(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理单个账号注册"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 询问是否需要生成账单
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_single_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_single_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"🔹 单个账号注册\n\n"
"是否需要生成欧洲账单 URL?",
reply_markup=reply_markup
)
async def batch_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理批量注册命令"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 解析数量
try:
count = int(context.args[0]) if context.args else 0
if count <= 0 or count > 20:
await update.message.reply_text(
"❌ 请提供有效的数量 (1-20)\n\n"
"示例: /batch 5"
)
return
# 保存数量到上下文
context.user_data['batch_count'] = count
# 询问是否需要生成账单
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_batch_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_batch_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"🔹 批量注册 {count} 个账号\n\n"
"是否需要生成欧洲账单 URL?",
reply_markup=reply_markup
)
except (IndexError, ValueError):
await update.message.reply_text(
"❌ 请提供有效的数量\n\n"
"示例: /batch 5"
)
async def payment_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理完整注册命令(含支付)"""
user_id = update.effective_user.id
if not check_authorization(user_id):
await update.message.reply_text("❌ 你没有权限使用此机器人。")
return
# 询问数量
keyboard = [
[InlineKeyboardButton("1⃣ 单个账号", callback_data="payment_count_1")],
[InlineKeyboardButton("5⃣ 5个账号", callback_data="payment_count_5")],
[InlineKeyboardButton("🔟 10个账号", callback_data="payment_count_10")],
[InlineKeyboardButton("📝 自定义数量", callback_data="payment_count_custom")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"💳 **完整注册(含支付)**\n\n"
"此模式将执行:\n"
"✅ 注册 OpenAI 账号\n"
"✅ 验证邮箱\n"
"✅ 生成欧洲账单 URL\n"
"✅ 自动添加 SEPA 支付方式\n"
"✅ 生成唯一德国 IBAN\n\n"
"请选择注册数量:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理按钮回调"""
query = update.callback_query
await query.answer()
user_id = query.from_user.id
if not check_authorization(user_id):
await query.message.reply_text("❌ 你没有权限使用此机器人。")
return
data = query.data
# Payment registration callbacks
if data == "register_payment":
keyboard = [
[InlineKeyboardButton("1⃣ 单个账号", callback_data="payment_count_1")],
[InlineKeyboardButton("5⃣ 5个账号", callback_data="payment_count_5")],
[InlineKeyboardButton("🔟 10个账号", callback_data="payment_count_10")],
[InlineKeyboardButton("📝 自定义数量", callback_data="payment_count_custom")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
"💳 **完整注册(含支付)**\n\n"
"此模式将执行:\n"
"✅ 注册 OpenAI 账号\n"
"✅ 验证邮箱\n"
"✅ 生成欧洲账单 URL\n"
"✅ 自动添加 SEPA 支付方式\n"
"✅ 生成唯一德国 IBAN\n\n"
"请选择注册数量:",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_count_"):
# 处理数量选择
if data == "payment_count_custom":
await query.message.edit_text(
"📝 **自定义数量**\n\n"
"请发送数量 (1-20):\n"
"直接回复一个数字即可",
parse_mode='Markdown'
)
context.user_data['awaiting_payment_count'] = True
return
# 提取数量
count_map = {
"payment_count_1": 1,
"payment_count_5": 5,
"payment_count_10": 10
}
count = count_map.get(data, 1)
context.user_data['payment_count'] = count
# 询问地区
keyboard = [
[InlineKeyboardButton("🇩🇪 德国地址", callback_data="payment_region_de")],
[InlineKeyboardButton("🇺🇸 美国地址", callback_data="payment_region_us")],
[InlineKeyboardButton("🌍 使用默认", callback_data="payment_region_default")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
f"✅ **注册数量:{count} 个**\n\n"
"请选择账单地址信息:\n\n"
"🇩🇪 德国地址 - 使用德国信息\n"
"🇺🇸 美国地址 - 使用美国信息\n"
"🌍 使用默认 - 使用默认配置",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_region_"):
count = context.user_data.get('payment_count', 1)
# 保存地区选择
context.user_data['payment_region'] = data
# 显示地区名称
region_name = {
"payment_region_de": "🇩🇪 德国",
"payment_region_us": "🇺🇸 美国",
"payment_region_default": "🌍 默认"
}.get(data, "🌍 默认")
# 询问输出方式
keyboard = [
[InlineKeyboardButton("📝 Notion 数据库", callback_data="payment_output_notion")],
[InlineKeyboardButton("📄 JSON 文件", callback_data="payment_output_json")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
f"✅ **地址信息:{region_name}**\n\n"
f"请选择输出方式:\n\n"
f"📝 **Notion 数据库** - 自动保存到 Notion\n"
f"📄 **JSON 文件** - 生成 JSON 格式输出\n\n"
f"JSON 格式示例:\n"
f"```json\n"
f'{{\n'
f' "account": "email@example.com",\n'
f' "password": "yourpassword",\n'
f' "token": "eyJ..."\n'
f'}}\n'
f"```",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_output_"):
count = context.user_data.get('payment_count', 1)
region_data = context.user_data.get('payment_region', 'payment_region_default')
# 保存输出格式选择
output_format = "json" if data == "payment_output_json" else "notion"
context.user_data['output_format'] = output_format
# 显示地区和输出方式
region_name = {
"payment_region_de": "🇩🇪 德国",
"payment_region_us": "🇺🇸 美国",
"payment_region_default": "🌍 默认"
}.get(region_data, "🌍 默认")
output_name = "📝 Notion 数据库" if output_format == "notion" else "📄 JSON 文件"
await query.message.edit_text(
f"⚙️ **配置完成**\n\n"
f"注册数量:{count}\n"
f"地址信息:{region_name}\n"
f"输出方式:{output_name}\n\n"
f"即将开始完整注册流程...",
parse_mode='Markdown'
)
# 根据地区设置默认信息
region_info = {
"payment_region_de": {
"name": "Hans Mueller",
"address_line1": "Hauptstraße 123",
"city": "Berlin",
"postal_code": "10115",
"state": "BE",
"country": "DE"
},
"payment_region_us": {
"name": "John Doe",
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
},
"payment_region_default": {
"name": "John Doe",
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
}
}
payment_info = region_info.get(region_data, region_info["payment_region_default"])
await perform_registration(
query.message,
count,
generate_billing=True,
add_payment=True,
payment_info=payment_info,
output_format=output_format
)
# Original callbacks
elif data == "register_single":
keyboard = [
[InlineKeyboardButton("✅ 生成账单 URL", callback_data="reg_single_with_billing")],
[InlineKeyboardButton("❌ 不生成账单", callback_data="reg_single_no_billing")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
"📝 **单个账号注册**\n\n"
"是否需要生成欧洲账单 URL?\n\n"
"✅ 生成账单 - 包含账单链接\n"
"❌ 不生成 - 仅注册和验证",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data == "register_batch":
await query.message.edit_text(
"📦 **批量注册**\n\n"
"请使用命令: `/batch <数量>`\n\n"
"示例: `/batch 5`",
parse_mode='Markdown'
)
elif data == "help":
await help_command(update, context)
elif data.startswith("reg_single_"):
generate_billing = "with_billing" in data
await perform_registration(query.message, 1, generate_billing)
elif data.startswith("reg_batch_"):
count = context.user_data.get('batch_count', 0)
if count <= 0:
await query.message.reply_text(
"❌ 请先使用 /batch <数量> 命令"
)
return
generate_billing = "with_billing" in data
await perform_registration(query.message, count, generate_billing)
async def perform_registration(message, count: int, generate_billing: bool, add_payment: bool = False, payment_info: dict = None, output_format: str = "notion"):
"""执行注册流程(使用 CompleteRegistrationFlow"""
# 发送开始消息
status_text = (
f"🔄 开始注册 {count} 个账号...\n"
f"{'✅ 将生成账单 URL' if generate_billing else '❌ 不生成账单'}\n"
)
if add_payment:
status_text += "✅ 将添加支付方式\n"
if output_format == "notion":
status_text += "✅ 将保存到 Notion\n"
else:
status_text += "📄 将输出 JSON 格式\n"
status_text += "\n⏳ 请稍候..."
status_msg = await message.reply_text(status_text)
# 初始化临时邮箱客户端
try:
api_base_url = TEMPMAIL_CONFIG.get('api_base_url')
username = TEMPMAIL_CONFIG.get('username')
password_cfg = TEMPMAIL_CONFIG.get('password')
admin_token = TEMPMAIL_CONFIG.get('admin_token')
if username and password_cfg:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
username=username,
password=password_cfg
)
elif admin_token:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
admin_token=admin_token
)
else:
await status_msg.edit_text("❌ 临时邮箱配置错误,请联系管理员")
return
except Exception as e:
await status_msg.edit_text(f"❌ 初始化失败: {str(e)}")
return
# 创建完整流程管理器
try:
flow = CompleteRegistrationFlow(tempmail_client)
if DEBUG:
print(f"✅ CompleteRegistrationFlow 初始化成功")
except Exception as e:
await status_msg.edit_text(f"❌ 流程管理器初始化失败: {str(e)}")
return
# 注册账号
success_accounts = []
failed_accounts = []
for i in range(1, count + 1):
try:
# 更新状态
progress_bar = "" * i + "" * (count - i)
await status_msg.edit_text(
f"🔄 **注册进度** [{i}/{count}]\n"
f"{progress_bar}\n\n"
f"✅ 成功: {len(success_accounts)}\n"
f"❌ 失败: {len(failed_accounts)}\n\n"
f"⏳ 正在处理第 {i} 个账号...",
parse_mode='Markdown'
)
# 生成密码
password = generate_random_password()
# 根据模式执行不同的注册流程
if add_payment:
# 完整流程:注册 + 账单 + 支付 + Notion/JSON
result = flow.register_with_payment(
password=password,
payment_info=payment_info,
save_to_notion=True,
output_format=output_format
)
elif generate_billing:
# 账单流程:注册 + 账单
result = flow.register_with_billing(password=password)
else:
# 基础流程:仅注册
result = flow.register_basic(password=password)
if not result.get('success'):
failed_accounts.append({
'email': result.get('email', 'N/A'),
'error': result.get('error', 'Unknown error')
})
continue
success_accounts.append(result)
except Exception as e:
failed_accounts.append({
'email': 'N/A',
'error': str(e)
})
# 发送结果摘要
progress_bar = "" * count
await status_msg.edit_text(
f"🎉 **注册完成!** [{count}/{count}]\n"
f"{progress_bar}\n\n"
f"✅ 成功: **{len(success_accounts)}**\n"
f"❌ 失败: **{len(failed_accounts)}**\n\n"
f"📨 正在发送账号信息...",
parse_mode='Markdown'
)
# 发送每个成功的账号
for idx, acc in enumerate(success_accounts, 1):
# 判断输出格式
if output_format == "json":
# JSON 格式输出
json_output = {
"account": acc.get('email'),
"password": acc.get('password'),
"token": acc.get('access_token', '')
}
import json as json_module
json_text = json_module.dumps(json_output, indent=2, ensure_ascii=False)
account_text = (
f"╔═══════════════════\n"
f"║ 📄 **账号 #{idx} (JSON)**\n"
f"╚═══════════════════\n\n"
f"```json\n{json_text}\n```"
)
else:
# Notion 格式输出(原来的显示方式)
account_text = (
f"╔═══════════════════\n"
f"║ 🎯 **账号 #{idx}**\n"
f"╚═══════════════════\n\n"
f"📧 **邮箱**\n"
f"`{acc['email']}`\n\n"
f"🔑 **密码**\n"
f"`{acc['password']}`\n"
)
if 'access_token' in acc:
account_text += f"\n🎫 **Access Token**\n`{acc['access_token'][:50]}...`\n"
if 'checkout_url' in acc:
account_text += f"\n💳 **账单链接**\n[点击打开支付页面]({acc['checkout_url']})\n"
elif 'billing_error' in acc:
account_text += f"\n⚠️ 账单: {acc['billing_error']}\n"
# 支付信息
if 'payment_added' in acc and acc['payment_added']:
account_text += f"\n✅ **支付方式**\nSEPA 已添加\n"
if 'iban' in acc:
account_text += f"\n🏦 **IBAN**\n`{acc['iban']}`\n"
elif 'payment_error' in acc:
account_text += f"\n⚠️ 支付: {acc['payment_error']}\n"
# Notion 保存状态
if 'notion_saved' in acc and acc['notion_saved']:
account_text += f"\n📝 **Notion**\n✅ 已保存到数据库\n"
elif 'notion_error' in acc:
account_text += f"\n⚠️ Notion: {acc['notion_error']}\n"
account_text += "\n━━━━━━━━━━━━━━━━━━━"
await message.reply_text(account_text, parse_mode='Markdown')
# 如果有失败的账号
if failed_accounts:
failed_text = "⚠️ **失败列表**\n\n"
for idx, acc in enumerate(failed_accounts, 1):
failed_text += f"`{idx}.` {acc['email']}\n{acc['error']}\n\n"
await message.reply_text(failed_text, parse_mode='Markdown')
# 如果是 JSON 格式且批量注册(>=2个账号生成并发送完整的 JSON 文件
if output_format == "json" and count >= 2 and success_accounts:
import json as json_module
import tempfile
from datetime import datetime
# 构建完整的 JSON 数据
json_data = []
for acc in success_accounts:
json_data.append({
"account": acc.get('email'),
"password": acc.get('password'),
"token": acc.get('access_token', '')
})
# 创建临时文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix=f'_accounts_{timestamp}.json', delete=False, encoding='utf-8')
json_module.dump(json_data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
# 发送文件
try:
with open(temp_file.name, 'rb') as f:
await message.reply_document(
document=f,
filename=f"openai_accounts_{timestamp}.json",
caption=f"📦 **批量注册完整 JSON 文件**\n\n✅ 包含 {len(success_accounts)} 个成功账号"
)
# 删除临时文件
import os
os.unlink(temp_file.name)
except Exception as e:
await message.reply_text(f"⚠️ 发送 JSON 文件失败: {str(e)}")
# 更新最终状态消息
await status_msg.edit_text(
f"✅ **全部完成!**\n\n"
f"📊 **统计**\n"
f"• 总数: {count}\n"
f"• 成功: {len(success_accounts)}\n"
f"• 失败: {len(failed_accounts)}\n\n"
f"{'✅ 账号信息已发送' if success_accounts else '❌ 没有成功的账号'}",
parse_mode='Markdown'
)
async def handle_text_message(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理文本消息(用于自定义数量输入)"""
user_id = update.effective_user.id
if not check_authorization(user_id):
return
# 检查是否在等待支付数量输入
if context.user_data.get('awaiting_payment_count'):
try:
count = int(update.message.text.strip())
if count <= 0 or count > 20:
await update.message.reply_text(
"❌ 请提供有效的数量 (1-20)\n\n"
"请重新输入数字:"
)
return
# 清除等待标志
context.user_data['awaiting_payment_count'] = False
context.user_data['payment_count'] = count
# 询问地区
keyboard = [
[InlineKeyboardButton("🇩🇪 德国地址", callback_data="payment_region_de")],
[InlineKeyboardButton("🇺🇸 美国地址", callback_data="payment_region_us")],
[InlineKeyboardButton("🌍 使用默认", callback_data="payment_region_default")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
f"✅ 将注册 {count} 个完整账号\n\n"
"请选择账单地址信息:",
reply_markup=reply_markup
)
except ValueError:
await update.message.reply_text(
"❌ 请输入有效的数字 (1-20)\n\n"
"请重新输入:"
)
async def error_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""全局错误处理"""
print(f"❌ Error: {context.error}")
if update and update.effective_message:
await update.effective_message.reply_text(
f"❌ 发生错误: {str(context.error)}\n\n"
"请稍后重试或联系管理员。"
)
def main():
"""启动 bot"""
if not BOT_TOKEN:
print("❌ Error: TELEGRAM_BOT_TOKEN not set!")
print("Please set environment variable: export TELEGRAM_BOT_TOKEN='your_token'")
sys.exit(1)
# 检查配置
api_base_url = TEMPMAIL_CONFIG.get('api_base_url')
if not api_base_url or 'your.tempmail.domain' in api_base_url:
print("❌ Error: TEMPMAIL_CONFIG not configured in config.py")
sys.exit(1)
print("🤖 Starting Telegram Bot...")
print(f"📋 Allowed Users: {len(ALLOWED_USERS) if ALLOWED_USERS else 'ALL (Public)'}")
# 创建应用
application = Application.builder().token(BOT_TOKEN).build()
# 添加处理器
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_command))
application.add_handler(CommandHandler("register", register_single))
application.add_handler(CommandHandler("batch", batch_register))
application.add_handler(CommandHandler("payment", payment_register))
application.add_handler(CallbackQueryHandler(button_callback))
application.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_text_message))
# 错误处理
application.add_error_handler(error_handler)
# 启动 bot
print("✅ Bot started successfully!")
print("Press Ctrl+C to stop")
application.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == '__main__':
main()