307 lines
9.4 KiB
Python
307 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
完整流程脚本:注册 -> 获取Token -> 生成账单 -> 支付
|
||
输出格式:{"account": "email", "password": "pwd", "token": "token"}
|
||
"""
|
||
|
||
import sys
|
||
import json
|
||
import time
|
||
from typing import Optional, Dict, List
|
||
|
||
from modules.register import OpenAIRegistrar
|
||
from modules.tempmail import TempMailClient
|
||
from modules.billing import EUBillingGenerator
|
||
from modules.iban_generator import GermanIbanGenerator
|
||
from config import TEMPMAIL_CONFIG
|
||
import secrets
|
||
import string
|
||
|
||
|
||
def generate_random_password(length: int = 16) -> str:
|
||
"""生成随机密码(满足OpenAI要求:大小写字母+数字+特殊字符)"""
|
||
# 确保包含所有类型的字符
|
||
lowercase = string.ascii_lowercase
|
||
uppercase = string.ascii_uppercase
|
||
digits = string.digits
|
||
special = '!@#$%^&*'
|
||
|
||
# 至少包含一个每种类型
|
||
password = [
|
||
secrets.choice(lowercase),
|
||
secrets.choice(uppercase),
|
||
secrets.choice(digits),
|
||
secrets.choice(special)
|
||
]
|
||
|
||
# 填充剩余长度
|
||
all_chars = lowercase + uppercase + digits + special
|
||
password.extend(secrets.choice(all_chars) for _ in range(length - 4))
|
||
|
||
# 打乱顺序
|
||
secrets.SystemRandom().shuffle(password)
|
||
|
||
return ''.join(password)
|
||
|
||
|
||
def generate_random_name():
|
||
"""生成随机德国姓名"""
|
||
import random
|
||
first_names = ["Max", "Hans", "Klaus", "Franz", "Otto", "Karl", "Fritz", "Wilhelm", "Heinrich", "Ludwig"]
|
||
last_names = ["Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann"]
|
||
return f"{random.choice(first_names)} {random.choice(last_names)}"
|
||
|
||
|
||
def generate_random_address():
|
||
"""生成随机德国地址"""
|
||
import random
|
||
streets = ["Hauptstraße", "Bahnhofstraße", "Berliner Straße", "Schulstraße", "Gartenstraße"]
|
||
cities = ["Berlin", "München", "Hamburg", "Frankfurt", "Köln", "Stuttgart", "Dresden", "Leipzig"]
|
||
|
||
street_num = random.randint(1, 200)
|
||
street = f"{random.choice(streets)} {street_num}"
|
||
city = random.choice(cities)
|
||
postal_code = f"{random.randint(10000, 99999)}"
|
||
|
||
return {
|
||
"address_line1": street,
|
||
"city": city,
|
||
"postal_code": postal_code,
|
||
"state": city,
|
||
"country": "DE"
|
||
}
|
||
|
||
|
||
def complete_flow() -> Optional[Dict]:
|
||
"""
|
||
完整流程:注册 -> Token -> 账单 -> 支付
|
||
|
||
返回格式:
|
||
{
|
||
"account": "email@example.com",
|
||
"password": "password123",
|
||
"token": "eyJ..."
|
||
}
|
||
"""
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"开始完整注册流程")
|
||
print(f"{'='*80}\n")
|
||
|
||
try:
|
||
# 1. 初始化临时邮箱
|
||
print("[1/6] 初始化临时邮箱...")
|
||
tempmail = TempMailClient(
|
||
api_base_url=TEMPMAIL_CONFIG.get('api_base_url'),
|
||
username=TEMPMAIL_CONFIG.get('username'),
|
||
password=TEMPMAIL_CONFIG.get('password'),
|
||
admin_token=TEMPMAIL_CONFIG.get('admin_token')
|
||
)
|
||
print("✅ 临时邮箱初始化成功")
|
||
|
||
# 2. 创建注册器
|
||
print("\n[2/6] 创建注册器...")
|
||
registrar = OpenAIRegistrar(tempmail_client=tempmail)
|
||
print("✅ 注册器创建成功")
|
||
|
||
# 3. 执行注册(自动生成邮箱和密码)
|
||
print("\n[3/6] 开始注册账号...")
|
||
|
||
# 生成随机密码
|
||
password = generate_random_password()
|
||
print(f" 生成密码: {password}")
|
||
|
||
register_result = registrar.register_with_auto_email(password=password)
|
||
|
||
if not register_result.get('success'):
|
||
error = register_result.get('error', 'Unknown error')
|
||
print(f"❌ 注册失败: {error}")
|
||
return None
|
||
|
||
email = register_result['email']
|
||
password = register_result['password']
|
||
print(f"✅ 注册成功")
|
||
print(f" 邮箱: {email}")
|
||
print(f" 密码: {password}")
|
||
|
||
# 4. 获取 Access Token
|
||
print("\n[4/6] 获取 Access Token...")
|
||
access_token = registrar._step5_get_access_token()
|
||
|
||
if not access_token:
|
||
print(f"❌ 无法获取 Access Token")
|
||
return None
|
||
|
||
print(f"✅ Token 获取成功")
|
||
print(f" Token: {access_token[:50]}...")
|
||
|
||
# 5. 生成欧洲账单
|
||
print("\n[5/6] 生成欧洲账单...")
|
||
billing_gen = EUBillingGenerator()
|
||
billing_result = billing_gen.generate_checkout_url(access_token)
|
||
|
||
if not billing_result.success:
|
||
print(f"❌ 账单生成失败: {billing_result.error}")
|
||
return None
|
||
|
||
checkout_url = billing_result.checkout_url
|
||
print(f"✅ 账单生成成功")
|
||
print(f" URL: {checkout_url[:60]}...")
|
||
|
||
# 6. 添加支付方式
|
||
print("\n[6/6] 添加支付方式...")
|
||
|
||
# 生成随机IBAN和地址
|
||
iban_generator = GermanIbanGenerator()
|
||
iban = iban_generator.generate(1)[0]
|
||
name = generate_random_name()
|
||
address = generate_random_address()
|
||
|
||
print(f" IBAN: {iban}")
|
||
print(f" 姓名: {name}")
|
||
print(f" 地址: {address['city']}, {address['address_line1']}")
|
||
|
||
payment_result = registrar.add_payment_method(
|
||
checkout_session_url=checkout_url,
|
||
iban=iban,
|
||
name=name,
|
||
email=email,
|
||
address_line1=address['address_line1'],
|
||
city=address['city'],
|
||
postal_code=address['postal_code'],
|
||
state=address['state'],
|
||
country=address['country']
|
||
)
|
||
|
||
if not payment_result.get('success'):
|
||
error = payment_result.get('error', 'Unknown error')
|
||
print(f"❌ 支付失败: {error}")
|
||
return None
|
||
|
||
print(f"✅ 支付成功")
|
||
|
||
# 7. 返回最终结果
|
||
result = {
|
||
"account": email,
|
||
"password": password,
|
||
"token": access_token
|
||
}
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"✅ 完整流程执行成功")
|
||
print(f"{'='*80}\n")
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ 流程异常: {e}")
|
||
import traceback
|
||
traceback.print_exc()
|
||
return None
|
||
|
||
|
||
def batch_flow(count: int, output_file: str = "complete_accounts.json", delay: int = 10) -> List[Dict]:
|
||
"""
|
||
批量执行完整流程
|
||
|
||
Args:
|
||
count: 要注册的账号数量
|
||
output_file: 输出文件路径
|
||
delay: 每个账号之间的延迟(秒)
|
||
|
||
Returns:
|
||
成功账号的列表
|
||
"""
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"批量注册流程")
|
||
print(f"数量: {count} 个账号")
|
||
print(f"输出: {output_file}")
|
||
print(f"{'='*80}\n")
|
||
|
||
results = []
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
for i in range(1, count + 1):
|
||
print(f"\n{'='*80}")
|
||
print(f"[{i}/{count}] 开始处理第 {i} 个账号")
|
||
print(f"{'='*80}")
|
||
|
||
result = complete_flow()
|
||
|
||
if result:
|
||
results.append(result)
|
||
success_count += 1
|
||
print(f"\n✅ [{i}/{count}] 成功")
|
||
|
||
# 实时保存进度
|
||
with open(output_file, 'w', encoding='utf-8') as f:
|
||
json.dump(results, f, indent=2, ensure_ascii=False)
|
||
print(f"💾 进度已保存")
|
||
else:
|
||
failed_count += 1
|
||
print(f"\n❌ [{i}/{count}] 失败")
|
||
|
||
# 延迟
|
||
if i < count:
|
||
print(f"\n⏳ 等待 {delay} 秒后继续...")
|
||
time.sleep(delay)
|
||
|
||
# 最终统计
|
||
print(f"\n{'='*80}")
|
||
print(f"批量处理完成")
|
||
print(f"{'='*80}")
|
||
print(f"总数: {count}")
|
||
print(f"成功: {success_count}")
|
||
print(f"失败: {failed_count}")
|
||
print(f"成功率: {success_count/count*100:.1f}%")
|
||
print(f"{'='*80}\n")
|
||
|
||
return results
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
import argparse
|
||
|
||
parser = argparse.ArgumentParser(description='完整注册流程:注册 -> Token -> 账单 -> 支付')
|
||
parser.add_argument('-c', '--count', type=int, default=1, help='要注册的账号数量(默认1)')
|
||
parser.add_argument('-o', '--output', type=str, default='complete_accounts.json', help='输出文件(默认complete_accounts.json)')
|
||
parser.add_argument('-d', '--delay', type=int, default=10, help='每个账号之间的延迟秒数(默认10)')
|
||
|
||
args = parser.parse_args()
|
||
|
||
if args.count == 1:
|
||
# 单个账号注册
|
||
result = complete_flow()
|
||
|
||
if result:
|
||
# 输出 JSON
|
||
print(f"\n📋 JSON 输出:")
|
||
print(json.dumps(result, indent=2, ensure_ascii=False))
|
||
|
||
# 保存到文件
|
||
with open(args.output, 'w', encoding='utf-8') as f:
|
||
json.dump([result], f, indent=2, ensure_ascii=False)
|
||
print(f"\n💾 已保存到: {args.output}")
|
||
else:
|
||
print(f"\n❌ 注册失败")
|
||
sys.exit(1)
|
||
else:
|
||
# 批量注册
|
||
results = batch_flow(
|
||
count=args.count,
|
||
output_file=args.output,
|
||
delay=args.delay
|
||
)
|
||
|
||
if results:
|
||
print(f"\n📋 JSON 输出:")
|
||
print(json.dumps(results, indent=2, ensure_ascii=False))
|
||
print(f"\n💾 结果已保存到: {args.output}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|