Files
AutoDoneTeam/run_complete_flow.py
2026-01-15 19:30:01 +08:00

307 lines
9.4 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
完整流程脚本:注册 -> 获取Token -> 生成账单 -> 支付
输出格式:{"account": "email", "password": "pwd", "token": "token"}
"""
import sys
import json
import time
from typing import Optional, Dict, List
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from modules.billing import EUBillingGenerator
from modules.iban_generator import GermanIbanGenerator
from config import TEMPMAIL_CONFIG
import secrets
import string
def generate_random_password(length: int = 16) -> str:
"""生成随机密码满足OpenAI要求大小写字母+数字+特殊字符)"""
# 确保包含所有类型的字符
lowercase = string.ascii_lowercase
uppercase = string.ascii_uppercase
digits = string.digits
special = '!@#$%^&*'
# 至少包含一个每种类型
password = [
secrets.choice(lowercase),
secrets.choice(uppercase),
secrets.choice(digits),
secrets.choice(special)
]
# 填充剩余长度
all_chars = lowercase + uppercase + digits + special
password.extend(secrets.choice(all_chars) for _ in range(length - 4))
# 打乱顺序
secrets.SystemRandom().shuffle(password)
return ''.join(password)
def generate_random_name():
"""生成随机德国姓名"""
import random
first_names = ["Max", "Hans", "Klaus", "Franz", "Otto", "Karl", "Fritz", "Wilhelm", "Heinrich", "Ludwig"]
last_names = ["Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann"]
return f"{random.choice(first_names)} {random.choice(last_names)}"
def generate_random_address():
"""生成随机德国地址"""
import random
streets = ["Hauptstraße", "Bahnhofstraße", "Berliner Straße", "Schulstraße", "Gartenstraße"]
cities = ["Berlin", "München", "Hamburg", "Frankfurt", "Köln", "Stuttgart", "Dresden", "Leipzig"]
street_num = random.randint(1, 200)
street = f"{random.choice(streets)} {street_num}"
city = random.choice(cities)
postal_code = f"{random.randint(10000, 99999)}"
return {
"address_line1": street,
"city": city,
"postal_code": postal_code,
"state": city,
"country": "DE"
}
def complete_flow() -> Optional[Dict]:
"""
完整流程:注册 -> Token -> 账单 -> 支付
返回格式:
{
"account": "email@example.com",
"password": "password123",
"token": "eyJ..."
}
"""
print(f"\n{'='*80}")
print(f"开始完整注册流程")
print(f"{'='*80}\n")
try:
# 1. 初始化临时邮箱
print("[1/6] 初始化临时邮箱...")
tempmail = TempMailClient(
api_base_url=TEMPMAIL_CONFIG.get('api_base_url'),
username=TEMPMAIL_CONFIG.get('username'),
password=TEMPMAIL_CONFIG.get('password'),
admin_token=TEMPMAIL_CONFIG.get('admin_token')
)
print("✅ 临时邮箱初始化成功")
# 2. 创建注册器
print("\n[2/6] 创建注册器...")
registrar = OpenAIRegistrar(tempmail_client=tempmail)
print("✅ 注册器创建成功")
# 3. 执行注册(自动生成邮箱和密码)
print("\n[3/6] 开始注册账号...")
# 生成随机密码
password = generate_random_password()
print(f" 生成密码: {password}")
register_result = registrar.register_with_auto_email(password=password)
if not register_result.get('success'):
error = register_result.get('error', 'Unknown error')
print(f"❌ 注册失败: {error}")
return None
email = register_result['email']
password = register_result['password']
print(f"✅ 注册成功")
print(f" 邮箱: {email}")
print(f" 密码: {password}")
# 4. 获取 Access Token
print("\n[4/6] 获取 Access Token...")
access_token = registrar._step5_get_access_token()
if not access_token:
print(f"❌ 无法获取 Access Token")
return None
print(f"✅ Token 获取成功")
print(f" Token: {access_token[:50]}...")
# 5. 生成欧洲账单
print("\n[5/6] 生成欧洲账单...")
billing_gen = EUBillingGenerator()
billing_result = billing_gen.generate_checkout_url(access_token)
if not billing_result.success:
print(f"❌ 账单生成失败: {billing_result.error}")
return None
checkout_url = billing_result.checkout_url
print(f"✅ 账单生成成功")
print(f" URL: {checkout_url[:60]}...")
# 6. 添加支付方式
print("\n[6/6] 添加支付方式...")
# 生成随机IBAN和地址
iban_generator = GermanIbanGenerator()
iban = iban_generator.generate(1)[0]
name = generate_random_name()
address = generate_random_address()
print(f" IBAN: {iban}")
print(f" 姓名: {name}")
print(f" 地址: {address['city']}, {address['address_line1']}")
payment_result = registrar.add_payment_method(
checkout_session_url=checkout_url,
iban=iban,
name=name,
email=email,
address_line1=address['address_line1'],
city=address['city'],
postal_code=address['postal_code'],
state=address['state'],
country=address['country']
)
if not payment_result.get('success'):
error = payment_result.get('error', 'Unknown error')
print(f"❌ 支付失败: {error}")
return None
print(f"✅ 支付成功")
# 7. 返回最终结果
result = {
"account": email,
"password": password,
"token": access_token
}
print(f"\n{'='*80}")
print(f"✅ 完整流程执行成功")
print(f"{'='*80}\n")
return result
except Exception as e:
print(f"\n❌ 流程异常: {e}")
import traceback
traceback.print_exc()
return None
def batch_flow(count: int, output_file: str = "complete_accounts.json", delay: int = 10) -> List[Dict]:
"""
批量执行完整流程
Args:
count: 要注册的账号数量
output_file: 输出文件路径
delay: 每个账号之间的延迟(秒)
Returns:
成功账号的列表
"""
print(f"\n{'='*80}")
print(f"批量注册流程")
print(f"数量: {count} 个账号")
print(f"输出: {output_file}")
print(f"{'='*80}\n")
results = []
success_count = 0
failed_count = 0
for i in range(1, count + 1):
print(f"\n{'='*80}")
print(f"[{i}/{count}] 开始处理第 {i} 个账号")
print(f"{'='*80}")
result = complete_flow()
if result:
results.append(result)
success_count += 1
print(f"\n✅ [{i}/{count}] 成功")
# 实时保存进度
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"💾 进度已保存")
else:
failed_count += 1
print(f"\n❌ [{i}/{count}] 失败")
# 延迟
if i < count:
print(f"\n⏳ 等待 {delay} 秒后继续...")
time.sleep(delay)
# 最终统计
print(f"\n{'='*80}")
print(f"批量处理完成")
print(f"{'='*80}")
print(f"总数: {count}")
print(f"成功: {success_count}")
print(f"失败: {failed_count}")
print(f"成功率: {success_count/count*100:.1f}%")
print(f"{'='*80}\n")
return results
def main():
"""主函数"""
import argparse
parser = argparse.ArgumentParser(description='完整注册流程:注册 -> Token -> 账单 -> 支付')
parser.add_argument('-c', '--count', type=int, default=1, help='要注册的账号数量默认1')
parser.add_argument('-o', '--output', type=str, default='complete_accounts.json', help='输出文件默认complete_accounts.json')
parser.add_argument('-d', '--delay', type=int, default=10, help='每个账号之间的延迟秒数默认10')
args = parser.parse_args()
if args.count == 1:
# 单个账号注册
result = complete_flow()
if result:
# 输出 JSON
print(f"\n📋 JSON 输出:")
print(json.dumps(result, indent=2, ensure_ascii=False))
# 保存到文件
with open(args.output, 'w', encoding='utf-8') as f:
json.dump([result], f, indent=2, ensure_ascii=False)
print(f"\n💾 已保存到: {args.output}")
else:
print(f"\n❌ 注册失败")
sys.exit(1)
else:
# 批量注册
results = batch_flow(
count=args.count,
output_file=args.output,
delay=args.delay
)
if results:
print(f"\n📋 JSON 输出:")
print(json.dumps(results, indent=2, ensure_ascii=False))
print(f"\n💾 结果已保存到: {args.output}")
if __name__ == "__main__":
main()