完成更新:json文件返回;

This commit is contained in:
dela
2026-01-15 19:30:01 +08:00
parent 38b58cbe25
commit 1bc9a14579
7 changed files with 620 additions and 50 deletions

View File

@@ -7,17 +7,16 @@ SENTINEL_BASE_URL = "https://sentinel.openai.com/sentinel"
# 临时邮箱配置
TEMPMAIL_CONFIG = {
'api_base_url': 'https://your.tempmail.domain', # 你的临时邮箱 API 地址
'api_base_url': 'https://mail.copy.qzz.io', # 你的临时邮箱 API 地址
# 方式1用户名密码登录推荐
'username': 'your_username', # 你的临时邮箱系统用户名
'password': 'your_password', # 你的密码
'username': 'mygoteam', # 你的临时邮箱系统用户名
'password': 'mygo123123', # 你的密码
# 方式2JWT Token备用
# 'admin_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
# 域名选择(0=第1个域名, 1=第2个, 2=第3个)
'domain_index': 0, # 改成 0, 1, 或 2 来选择不同的域名后缀
'domain_index': 2, # 改成 0, 1, 或 2 来选择不同的域名后缀
}
# SDK 路径
@@ -25,17 +24,18 @@ SDK_JS_PATH = "assets/sdk.js"
# 浏览器指纹配置
FINGERPRINT_CONFIG = {
'user_agent': 'Mozilla/5.0 (X11; Linux x86_64; rv:146.0) Gecko/20100101 Firefox/146.0',
# 使用最新 Chrome 的 User-AgentWindows 10
'user_agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'screen_width': 1920,
'screen_height': 1080,
'languages': ['en-US', 'en'],
'hardware_concurrency': 8,
'platform': 'Linux x86_64',
'platform': 'Win32', # 改成 Windows匹配 UA
}
# HTTP 配置
HTTP_CONFIG = {
'impersonate': 'chrome110', # curl-cffi 的浏览器模拟
'impersonate': 'chrome131', # 使用最新的 Chrome 131 指纹
'timeout': 30,
}

View File

@@ -0,0 +1,139 @@
#!/usr/bin/env python3
"""
批量获取 OpenAI token
读取 needtoken.json使用 flow_email_token.py 登录获取 token保存为指定格式
"""
import json
import sys
import os
import time
from pathlib import Path
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from flow.flow_email_token import OpenAILogin
def load_accounts(json_path: str):
"""加载账号列表"""
with open(json_path, 'r', encoding='utf-8') as f:
return json.load(f)
def save_results(results: list, output_path: str):
"""保存结果到文件"""
with open(output_path, 'w', encoding='utf-8') as f:
# 写入表头
f.write("账号 | 密码 | token\n")
f.write("-" * 100 + "\n")
# 写入数据
for item in results:
email = item['email']
password = item['password']
token = item.get('token', 'FAILED')
f.write(f"{email} | {password} | {token}\n")
print(f"\n✅ 结果已保存到: {output_path}")
def batch_get_tokens(json_path: str, output_path: str, delay: int = 5):
"""批量获取 token"""
print(f"{'='*80}")
print(f"批量获取 OpenAI Token")
print(f"{'='*80}\n")
# 加载账号列表
print(f"📂 加载账号列表: {json_path}")
accounts = load_accounts(json_path)
total = len(accounts)
print(f" 找到 {total} 个账号\n")
# 结果列表
results = []
success_count = 0
failed_count = 0
# 逐个登录获取 token
for idx, account in enumerate(accounts, 1):
email = account['email']
password = account['password']
print(f"\n{'='*80}")
print(f"[{idx}/{total}] 正在处理: {email}")
print(f"{'='*80}")
try:
# 创建登录客户端
client = OpenAILogin()
# 执行登录
access_token = client.login(email, password)
if access_token:
print(f"\n✅ [{idx}/{total}] 成功获取 token")
results.append({
'email': email,
'password': password,
'token': access_token
})
success_count += 1
else:
print(f"\n❌ [{idx}/{total}] 登录失败")
results.append({
'email': email,
'password': password,
'token': 'FAILED'
})
failed_count += 1
except Exception as e:
print(f"\n❌ [{idx}/{total}] 异常: {e}")
results.append({
'email': email,
'password': password,
'token': f'ERROR: {str(e)}'
})
failed_count += 1
# 延迟(避免频繁请求)
if idx < total:
print(f"\n⏳ 等待 {delay} 秒后继续...")
time.sleep(delay)
# 保存结果
print(f"\n{'='*80}")
print(f"批量处理完成")
print(f"{'='*80}")
print(f"总数: {total}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"{'='*80}\n")
save_results(results, output_path)
return results
def main():
"""主函数"""
# 默认路径
json_path = "needtoken.json"
output_path = "tokens_result.txt"
delay = 5 # 每个账号之间的延迟(秒)
# 检查输入文件是否存在
if not os.path.exists(json_path):
print(f"❌ 文件不存在: {json_path}")
return
# 执行批量获取
batch_get_tokens(json_path, output_path, delay)
if __name__ == "__main__":
main()

View File

@@ -136,7 +136,8 @@ class CompleteRegistrationFlow:
self,
password: str,
payment_info: Optional[Dict] = None,
save_to_notion: bool = True
save_to_notion: bool = True,
output_format: str = "notion" # "notion" or "json"
) -> Dict:
"""
完整注册流程(注册 + 账单 + 支付 + Notion
@@ -145,6 +146,7 @@ class CompleteRegistrationFlow:
password: 账号密码
payment_info: 支付信息字典,包含 name, address_line1, city, postal_code, state, country
save_to_notion: 是否保存到 Notion
output_format: 输出格式,"notion""json"
Returns:
完整的注册结果
@@ -160,8 +162,9 @@ class CompleteRegistrationFlow:
checkout_url = result.get('checkout_url')
if not checkout_url:
result['payment_error'] = result.get('billing_error', '未生成账单链接')
if save_to_notion and self.notion_client:
if output_format == "notion" and save_to_notion and self.notion_client:
self._save_to_notion(result, password)
result['output_format'] = output_format
return result
# 生成 IBAN
@@ -175,8 +178,9 @@ class CompleteRegistrationFlow:
except Exception as e:
result['payment_error'] = f"生成 IBAN 失败: {str(e)}"
if save_to_notion and self.notion_client:
if output_format == "notion" and save_to_notion and self.notion_client:
self._save_to_notion(result, password)
result['output_format'] = output_format
return result
# 准备支付信息(使用默认值或用户提供的值)
@@ -216,10 +220,13 @@ class CompleteRegistrationFlow:
except Exception as e:
result['payment_error'] = str(e)
# 保存到 Notion
if save_to_notion and self.notion_client:
# 根据输出格式决定是否保存到 Notion
if output_format == "notion" and save_to_notion and self.notion_client:
self._save_to_notion(result, password)
# 标记输出格式
result['output_format'] = output_format
return result
except Exception as e:

View File

@@ -12,7 +12,7 @@ import os
from urllib.parse import urlparse, parse_qs
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
try:
from curl_cffi.requests import Session

View File

@@ -5,6 +5,13 @@ Stripe Payment Module Test Script
"""
import sys
from pathlib import Path
# Add project root to path if needed
project_root = Path(__file__).parent.parent
if str(project_root) not in sys.path:
sys.path.insert(0, str(project_root))
from modules.stripe_payment import StripePaymentHandler
from modules.iban_generator import GermanIbanGenerator

306
run_complete_flow.py Normal file
View File

@@ -0,0 +1,306 @@
#!/usr/bin/env python3
"""
完整流程脚本:注册 -> 获取Token -> 生成账单 -> 支付
输出格式:{"account": "email", "password": "pwd", "token": "token"}
"""
import sys
import json
import time
from typing import Optional, Dict, List
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from modules.billing import EUBillingGenerator
from modules.iban_generator import GermanIbanGenerator
from config import TEMPMAIL_CONFIG
import secrets
import string
def generate_random_password(length: int = 16) -> str:
"""生成随机密码满足OpenAI要求大小写字母+数字+特殊字符)"""
# 确保包含所有类型的字符
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = '!@#$%^&*'
# 至少包含一个每种类型
password = [
secrets.choice(lowercase),
secrets.choice(uppercase),
secrets.choice(digits),
secrets.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(secrets.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
secrets.SystemRandom().shuffle(password)
return ''.join(password)
def generate_random_name():
"""生成随机德国姓名"""
import random
first_names = ["Max", "Hans", "Klaus", "Franz", "Otto", "Karl", "Fritz", "Wilhelm", "Heinrich", "Ludwig"]
last_names = ["Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann"]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def generate_random_address():
"""生成随机德国地址"""
import random
streets = ["Hauptstraße", "Bahnhofstraße", "Berliner Straße", "Schulstraße", "Gartenstraße"]
cities = ["Berlin", "München", "Hamburg", "Frankfurt", "Köln", "Stuttgart", "Dresden", "Leipzig"]
street_num = random.randint(1, 200)
street = f"{random.choice(streets)} {street_num}"
city = random.choice(cities)
postal_code = f"{random.randint(10000, 99999)}"
return {
"address_line1": street,
"city": city,
"postal_code": postal_code,
"state": city,
"country": "DE"
}
def complete_flow() -> Optional[Dict]:
"""
完整流程:注册 -> Token -> 账单 -> 支付
返回格式:
{
"account": "email@example.com",
"password": "password123",
"token": "eyJ..."
}
"""
print(f"\n{'='*80}")
print(f"开始完整注册流程")
print(f"{'='*80}\n")
try:
# 1. 初始化临时邮箱
print("[1/6] 初始化临时邮箱...")
tempmail = TempMailClient(
api_base_url=TEMPMAIL_CONFIG.get('api_base_url'),
username=TEMPMAIL_CONFIG.get('username'),
password=TEMPMAIL_CONFIG.get('password'),
admin_token=TEMPMAIL_CONFIG.get('admin_token')
)
print("✅ 临时邮箱初始化成功")
# 2. 创建注册器
print("\n[2/6] 创建注册器...")
registrar = OpenAIRegistrar(tempmail_client=tempmail)
print("✅ 注册器创建成功")
# 3. 执行注册(自动生成邮箱和密码)
print("\n[3/6] 开始注册账号...")
# 生成随机密码
password = generate_random_password()
print(f" 生成密码: {password}")
register_result = registrar.register_with_auto_email(password=password)
if not register_result.get('success'):
error = register_result.get('error', 'Unknown error')
print(f"❌ 注册失败: {error}")
return None
email = register_result['email']
password = register_result['password']
print(f"✅ 注册成功")
print(f" 邮箱: {email}")
print(f" 密码: {password}")
# 4. 获取 Access Token
print("\n[4/6] 获取 Access Token...")
access_token = registrar._step5_get_access_token()
if not access_token:
print(f"❌ 无法获取 Access Token")
return None
print(f"✅ Token 获取成功")
print(f" Token: {access_token[:50]}...")
# 5. 生成欧洲账单
print("\n[5/6] 生成欧洲账单...")
billing_gen = EUBillingGenerator()
billing_result = billing_gen.generate_checkout_url(access_token)
if not billing_result.success:
print(f"❌ 账单生成失败: {billing_result.error}")
return None
checkout_url = billing_result.checkout_url
print(f"✅ 账单生成成功")
print(f" URL: {checkout_url[:60]}...")
# 6. 添加支付方式
print("\n[6/6] 添加支付方式...")
# 生成随机IBAN和地址
iban_generator = GermanIbanGenerator()
iban = iban_generator.generate(1)[0]
name = generate_random_name()
address = generate_random_address()
print(f" IBAN: {iban}")
print(f" 姓名: {name}")
print(f" 地址: {address['city']}, {address['address_line1']}")
payment_result = registrar.add_payment_method(
checkout_session_url=checkout_url,
iban=iban,
name=name,
email=email,
address_line1=address['address_line1'],
city=address['city'],
postal_code=address['postal_code'],
state=address['state'],
country=address['country']
)
if not payment_result.get('success'):
error = payment_result.get('error', 'Unknown error')
print(f"❌ 支付失败: {error}")
return None
print(f"✅ 支付成功")
# 7. 返回最终结果
result = {
"account": email,
"password": password,
"token": access_token
}
print(f"\n{'='*80}")
print(f"✅ 完整流程执行成功")
print(f"{'='*80}\n")
return result
except Exception as e:
print(f"\n❌ 流程异常: {e}")
import traceback
traceback.print_exc()
return None
def batch_flow(count: int, output_file: str = "complete_accounts.json", delay: int = 10) -> List[Dict]:
"""
批量执行完整流程
Args:
count: 要注册的账号数量
output_file: 输出文件路径
delay: 每个账号之间的延迟(秒)
Returns:
成功账号的列表
"""
print(f"\n{'='*80}")
print(f"批量注册流程")
print(f"数量: {count} 个账号")
print(f"输出: {output_file}")
print(f"{'='*80}\n")
results = []
success_count = 0
failed_count = 0
for i in range(1, count + 1):
print(f"\n{'='*80}")
print(f"[{i}/{count}] 开始处理第 {i} 个账号")
print(f"{'='*80}")
result = complete_flow()
if result:
results.append(result)
success_count += 1
print(f"\n✅ [{i}/{count}] 成功")
# 实时保存进度
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"💾 进度已保存")
else:
failed_count += 1
print(f"\n❌ [{i}/{count}] 失败")
# 延迟
if i < count:
print(f"\n⏳ 等待 {delay} 秒后继续...")
time.sleep(delay)
# 最终统计
print(f"\n{'='*80}")
print(f"批量处理完成")
print(f"{'='*80}")
print(f"总数: {count}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"成功率: {success_count/count*100:.1f}%")
print(f"{'='*80}\n")
return results
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='完整注册流程:注册 -> Token -> 账单 -> 支付')
parser.add_argument('-c', '--count', type=int, default=1, help='要注册的账号数量默认1')
parser.add_argument('-o', '--output', type=str, default='complete_accounts.json', help='输出文件默认complete_accounts.json')
parser.add_argument('-d', '--delay', type=int, default=10, help='每个账号之间的延迟秒数默认10')
args = parser.parse_args()
if args.count == 1:
# 单个账号注册
result = complete_flow()
if result:
# 输出 JSON
print(f"\n📋 JSON 输出:")
print(json.dumps(result, indent=2, ensure_ascii=False))
# 保存到文件
with open(args.output, 'w', encoding='utf-8') as f:
json.dump([result], f, indent=2, ensure_ascii=False)
print(f"\n💾 已保存到: {args.output}")
else:
print(f"\n❌ 注册失败")
sys.exit(1)
else:
# 批量注册
results = batch_flow(
count=args.count,
output_file=args.output,
delay=args.delay
)
if results:
print(f"\n📋 JSON 输出:")
print(json.dumps(results, indent=2, ensure_ascii=False))
print(f"\n💾 结果已保存到: {args.output}")
if __name__ == "__main__":
main()

125
tg_bot.py
View File

@@ -321,17 +321,61 @@ async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
elif data.startswith("payment_region_"):
count = context.user_data.get('payment_count', 1)
# 显示开始信息
# 保存地区选择
context.user_data['payment_region'] = data
# 显示地区名称
region_name = {
"payment_region_de": "🇩🇪 德国",
"payment_region_us": "🇺🇸 美国",
"payment_region_default": "🌍 默认"
}.get(data, "🌍 默认")
# 询问输出方式
keyboard = [
[InlineKeyboardButton("📝 Notion 数据库", callback_data="payment_output_notion")],
[InlineKeyboardButton("📄 JSON 文件", callback_data="payment_output_json")]
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.message.edit_text(
f"✅ **地址信息:{region_name}**\n\n"
f"请选择输出方式:\n\n"
f"📝 **Notion 数据库** - 自动保存到 Notion\n"
f"📄 **JSON 文件** - 生成 JSON 格式输出\n\n"
f"JSON 格式示例:\n"
f"```json\n"
f'{{\n'
f' "account": "email@example.com",\n'
f' "password": "yourpassword",\n'
f' "token": "eyJ..."\n'
f'}}\n'
f"```",
reply_markup=reply_markup,
parse_mode='Markdown'
)
elif data.startswith("payment_output_"):
count = context.user_data.get('payment_count', 1)
region_data = context.user_data.get('payment_region', 'payment_region_default')
# 保存输出格式选择
output_format = "json" if data == "payment_output_json" else "notion"
context.user_data['output_format'] = output_format
# 显示地区和输出方式
region_name = {
"payment_region_de": "🇩🇪 德国",
"payment_region_us": "🇺🇸 美国",
"payment_region_default": "🌍 默认"
}.get(region_data, "🌍 默认")
output_name = "📝 Notion 数据库" if output_format == "notion" else "📄 JSON 文件"
await query.message.edit_text(
f"⚙️ **配置完成**\n\n"
f"注册数量:{count}\n"
f"地址信息:{region_name}\n\n"
f"地址信息:{region_name}\n"
f"输出方式:{output_name}\n\n"
f"即将开始完整注册流程...",
parse_mode='Markdown'
)
@@ -364,8 +408,15 @@ async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
}
}
payment_info = region_info.get(data, region_info["payment_region_default"])
await perform_registration(query.message, count, generate_billing=True, add_payment=True, payment_info=payment_info)
payment_info = region_info.get(region_data, region_info["payment_region_default"])
await perform_registration(
query.message,
count,
generate_billing=True,
add_payment=True,
payment_info=payment_info,
output_format=output_format
)
# Original callbacks
elif data == "register_single":
@@ -410,7 +461,7 @@ async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
await perform_registration(query.message, count, generate_billing)
async def perform_registration(message, count: int, generate_billing: bool, add_payment: bool = False, payment_info: dict = None):
async def perform_registration(message, count: int, generate_billing: bool, add_payment: bool = False, payment_info: dict = None, output_format: str = "notion"):
"""执行注册流程(使用 CompleteRegistrationFlow"""
# 发送开始消息
status_text = (
@@ -419,7 +470,10 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
)
if add_payment:
status_text += "✅ 将添加支付方式\n"
if output_format == "notion":
status_text += "✅ 将保存到 Notion\n"
else:
status_text += "📄 将输出 JSON 格式\n"
status_text += "\n⏳ 请稍候..."
status_msg = await message.reply_text(status_text)
@@ -481,11 +535,12 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
# 根据模式执行不同的注册流程
if add_payment:
# 完整流程:注册 + 账单 + 支付 + Notion
# 完整流程:注册 + 账单 + 支付 + Notion/JSON
result = flow.register_with_payment(
password=password,
payment_info=payment_info,
save_to_notion=True
save_to_notion=True,
output_format=output_format
)
elif generate_billing:
# 账单流程:注册 + 账单
@@ -522,6 +577,26 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
# 发送每个成功的账号
for idx, acc in enumerate(success_accounts, 1):
# 判断输出格式
if output_format == "json":
# JSON 格式输出
json_output = {
"account": acc.get('email'),
"password": acc.get('password'),
"token": acc.get('access_token', '')
}
import json as json_module
json_text = json_module.dumps(json_output, indent=2, ensure_ascii=False)
account_text = (
f"╔═══════════════════\n"
f"║ 📄 **账号 #{idx} (JSON)**\n"
f"╚═══════════════════\n\n"
f"```json\n{json_text}\n```"
)
else:
# Notion 格式输出(原来的显示方式)
account_text = (
f"╔═══════════════════\n"
f"║ 🎯 **账号 #{idx}**\n"
@@ -565,6 +640,42 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
failed_text += f"`{idx}.` {acc['email']}\n{acc['error']}\n\n"
await message.reply_text(failed_text, parse_mode='Markdown')
# 如果是 JSON 格式且批量注册(>=2个账号生成并发送完整的 JSON 文件
if output_format == "json" and count >= 2 and success_accounts:
import json as json_module
import tempfile
from datetime import datetime
# 构建完整的 JSON 数据
json_data = []
for acc in success_accounts:
json_data.append({
"account": acc.get('email'),
"password": acc.get('password'),
"token": acc.get('access_token', '')
})
# 创建临时文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
temp_file = tempfile.NamedTemporaryFile(mode='w', suffix=f'_accounts_{timestamp}.json', delete=False, encoding='utf-8')
json_module.dump(json_data, temp_file, indent=2, ensure_ascii=False)
temp_file.close()
# 发送文件
try:
with open(temp_file.name, 'rb') as f:
await message.reply_document(
document=f,
filename=f"openai_accounts_{timestamp}.json",
caption=f"📦 **批量注册完整 JSON 文件**\n\n✅ 包含 {len(success_accounts)} 个成功账号"
)
# 删除临时文件
import os
os.unlink(temp_file.name)
except Exception as e:
await message.reply_text(f"⚠️ 发送 JSON 文件失败: {str(e)}")
# 更新最终状态消息
await status_msg.edit_text(
f"✅ **全部完成!**\n\n"