#!/usr/bin/env python3 """ 完整流程脚本:注册 -> 获取Token -> 生成账单 -> 支付 输出格式:{"account": "email", "password": "pwd", "token": "token"} """ import sys import json import time from typing import Optional, Dict, List from modules.register import OpenAIRegistrar from modules.tempmail import TempMailClient from modules.billing import EUBillingGenerator from modules.iban_generator import GermanIbanGenerator from config import TEMPMAIL_CONFIG import secrets import string def generate_random_password(length: int = 16) -> str: """生成随机密码(满足OpenAI要求:大小写字母+数字+特殊字符)""" # 确保包含所有类型的字符 lowercase = string.ascii_lowercase uppercase = string.ascii_uppercase digits = string.digits special = '!@#$%^&*' # 至少包含一个每种类型 password = [ secrets.choice(lowercase), secrets.choice(uppercase), secrets.choice(digits), secrets.choice(special) ] # 填充剩余长度 all_chars = lowercase + uppercase + digits + special password.extend(secrets.choice(all_chars) for _ in range(length - 4)) # 打乱顺序 secrets.SystemRandom().shuffle(password) return ''.join(password) def generate_random_name(): """生成随机德国姓名""" import random first_names = ["Max", "Hans", "Klaus", "Franz", "Otto", "Karl", "Fritz", "Wilhelm", "Heinrich", "Ludwig"] last_names = ["Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann"] return f"{random.choice(first_names)} {random.choice(last_names)}" def generate_random_address(): """生成随机德国地址""" import random streets = ["Hauptstraße", "Bahnhofstraße", "Berliner Straße", "Schulstraße", "Gartenstraße"] cities = ["Berlin", "München", "Hamburg", "Frankfurt", "Köln", "Stuttgart", "Dresden", "Leipzig"] street_num = random.randint(1, 200) street = f"{random.choice(streets)} {street_num}" city = random.choice(cities) postal_code = f"{random.randint(10000, 99999)}" return { "address_line1": street, "city": city, "postal_code": postal_code, "state": city, "country": "DE" } def complete_flow() -> Optional[Dict]: """ 完整流程:注册 -> Token -> 账单 -> 支付 返回格式: { "account": "email@example.com", "password": "password123", "token": "eyJ..." } """ print(f"\n{'='*80}") print(f"开始完整注册流程") print(f"{'='*80}\n") try: # 1. 初始化临时邮箱 print("[1/6] 初始化临时邮箱...") tempmail = TempMailClient( api_base_url=TEMPMAIL_CONFIG.get('api_base_url'), username=TEMPMAIL_CONFIG.get('username'), password=TEMPMAIL_CONFIG.get('password'), admin_token=TEMPMAIL_CONFIG.get('admin_token') ) print("✅ 临时邮箱初始化成功") # 2. 创建注册器 print("\n[2/6] 创建注册器...") registrar = OpenAIRegistrar(tempmail_client=tempmail) print("✅ 注册器创建成功") # 3. 执行注册(自动生成邮箱和密码) print("\n[3/6] 开始注册账号...") # 生成随机密码 password = generate_random_password() print(f" 生成密码: {password}") register_result = registrar.register_with_auto_email(password=password) if not register_result.get('success'): error = register_result.get('error', 'Unknown error') print(f"❌ 注册失败: {error}") return None email = register_result['email'] password = register_result['password'] print(f"✅ 注册成功") print(f" 邮箱: {email}") print(f" 密码: {password}") # 4. 获取 Access Token print("\n[4/6] 获取 Access Token...") access_token = registrar._step5_get_access_token() if not access_token: print(f"❌ 无法获取 Access Token") return None print(f"✅ Token 获取成功") print(f" Token: {access_token[:50]}...") # 5. 生成欧洲账单 print("\n[5/6] 生成欧洲账单...") billing_gen = EUBillingGenerator() billing_result = billing_gen.generate_checkout_url(access_token) if not billing_result.success: print(f"❌ 账单生成失败: {billing_result.error}") return None checkout_url = billing_result.checkout_url print(f"✅ 账单生成成功") print(f" URL: {checkout_url[:60]}...") # 6. 添加支付方式 print("\n[6/6] 添加支付方式...") # 生成随机IBAN和地址 iban_generator = GermanIbanGenerator() iban = iban_generator.generate(1)[0] name = generate_random_name() address = generate_random_address() print(f" IBAN: {iban}") print(f" 姓名: {name}") print(f" 地址: {address['city']}, {address['address_line1']}") payment_result = registrar.add_payment_method( checkout_session_url=checkout_url, iban=iban, name=name, email=email, address_line1=address['address_line1'], city=address['city'], postal_code=address['postal_code'], state=address['state'], country=address['country'] ) if not payment_result.get('success'): error = payment_result.get('error', 'Unknown error') print(f"❌ 支付失败: {error}") return None print(f"✅ 支付成功") # 7. 返回最终结果 result = { "account": email, "password": password, "token": access_token } print(f"\n{'='*80}") print(f"✅ 完整流程执行成功") print(f"{'='*80}\n") return result except Exception as e: print(f"\n❌ 流程异常: {e}") import traceback traceback.print_exc() return None def batch_flow(count: int, output_file: str = "complete_accounts.json", delay: int = 10) -> List[Dict]: """ 批量执行完整流程 Args: count: 要注册的账号数量 output_file: 输出文件路径 delay: 每个账号之间的延迟(秒) Returns: 成功账号的列表 """ print(f"\n{'='*80}") print(f"批量注册流程") print(f"数量: {count} 个账号") print(f"输出: {output_file}") print(f"{'='*80}\n") results = [] success_count = 0 failed_count = 0 for i in range(1, count + 1): print(f"\n{'='*80}") print(f"[{i}/{count}] 开始处理第 {i} 个账号") print(f"{'='*80}") result = complete_flow() if result: results.append(result) success_count += 1 print(f"\n✅ [{i}/{count}] 成功") # 实时保存进度 with open(output_file, 'w', encoding='utf-8') as f: json.dump(results, f, indent=2, ensure_ascii=False) print(f"💾 进度已保存") else: failed_count += 1 print(f"\n❌ [{i}/{count}] 失败") # 延迟 if i < count: print(f"\n⏳ 等待 {delay} 秒后继续...") time.sleep(delay) # 最终统计 print(f"\n{'='*80}") print(f"批量处理完成") print(f"{'='*80}") print(f"总数: {count}") print(f"成功: {success_count}") print(f"失败: {failed_count}") print(f"成功率: {success_count/count*100:.1f}%") print(f"{'='*80}\n") return results def main(): """主函数""" import argparse parser = argparse.ArgumentParser(description='完整注册流程:注册 -> Token -> 账单 -> 支付') parser.add_argument('-c', '--count', type=int, default=1, help='要注册的账号数量(默认1)') parser.add_argument('-o', '--output', type=str, default='complete_accounts.json', help='输出文件(默认complete_accounts.json)') parser.add_argument('-d', '--delay', type=int, default=10, help='每个账号之间的延迟秒数(默认10)') args = parser.parse_args() if args.count == 1: # 单个账号注册 result = complete_flow() if result: # 输出 JSON print(f"\n📋 JSON 输出:") print(json.dumps(result, indent=2, ensure_ascii=False)) # 保存到文件 with open(args.output, 'w', encoding='utf-8') as f: json.dump([result], f, indent=2, ensure_ascii=False) print(f"\n💾 已保存到: {args.output}") else: print(f"\n❌ 注册失败") sys.exit(1) else: # 批量注册 results = batch_flow( count=args.count, output_file=args.output, delay=args.delay ) if results: print(f"\n📋 JSON 输出:") print(json.dumps(results, indent=2, ensure_ascii=False)) print(f"\n💾 结果已保存到: {args.output}") if __name__ == "__main__": main()