多线程

This commit is contained in:
dela
2026-01-16 20:19:31 +08:00
parent 1bc9a14579
commit bc9faee4d7

301
multi_thread_register.py Normal file
View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""多线程自动注册 OpenAI 账号
输出格式: 账号|密码|token 的 JSON
使用方式:
python multi_thread_register.py --count 10 --threads 3
"""
import sys
import argparse
import secrets
import json
import threading
from queue import Queue
from typing import List, Dict
from datetime import datetime
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from config import TEMPMAIL_CONFIG, DEBUG
def generate_random_password(length: int = 16) -> str:
"""生成随机密码"""
chars = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789!@#$%^&*'
return ''.join(secrets.choice(chars) for _ in range(length))
class RegistrationWorker(threading.Thread):
"""注册工作线程"""
def __init__(self, thread_id: int, task_queue: Queue, result_queue: Queue, tempmail_client: TempMailClient):
super().__init__()
self.thread_id = thread_id
self.task_queue = task_queue
self.result_queue = result_queue
self.tempmail_client = tempmail_client
self.daemon = True
def run(self):
"""执行注册任务"""
while True:
try:
# 从队列获取任务
task = self.task_queue.get()
if task is None: # 结束信号
break
task_id = task['id']
password = task['password']
print(f"[Thread-{self.thread_id}] 开始注册账号 #{task_id}")
# 创建注册器
registrar = OpenAIRegistrar(tempmail_client=self.tempmail_client)
# 执行注册
result = registrar.register_with_auto_email(password)
# 如果注册成功,获取 access token
if result.get('success'):
try:
access_token = registrar._step5_get_access_token()
# 输出格式: 账号|密码|token
output_data = {
'account': result['email'],
'password': result['password'],
'token': access_token,
'verified': result.get('verified', False)
}
self.result_queue.put({
'success': True,
'task_id': task_id,
'data': output_data
})
print(f"[Thread-{self.thread_id}] ✅ 账号 #{task_id} 注册成功")
print(f" {result['email']}|{result['password']}|{access_token[:50]}...")
except Exception as e:
print(f"[Thread-{self.thread_id}] ⚠️ 账号 #{task_id} 注册成功但获取 token 失败: {e}")
self.result_queue.put({
'success': False,
'task_id': task_id,
'error': f'Token获取失败: {str(e)}',
'email': result.get('email')
})
else:
print(f"[Thread-{self.thread_id}] ❌ 账号 #{task_id} 注册失败: {result.get('error')}")
self.result_queue.put({
'success': False,
'task_id': task_id,
'error': result.get('error'),
'email': result.get('email')
})
except Exception as e:
print(f"[Thread-{self.thread_id}] ❌ 异常: {e}")
self.result_queue.put({
'success': False,
'task_id': task_id if 'task_id' in locals() else -1,
'error': str(e)
})
finally:
self.task_queue.task_done()
def multi_thread_register(count: int, threads: int, password: str = None, output_file: str = None):
"""多线程批量注册
Args:
count: 注册账号数量
threads: 线程数
password: 统一密码None 则每个账号生成不同密码)
output_file: 输出 JSON 文件路径
"""
print(f"\n{'='*60}")
print(f"多线程批量注册")
print(f"{'='*60}")
print(f"账号数量: {count}")
print(f"线程数: {threads}")
print(f"密码: {'统一密码' if password else '随机生成'}")
print(f"{'='*60}\n")
# 检查临时邮箱配置
api_base_url = TEMPMAIL_CONFIG.get('api_base_url')
username = TEMPMAIL_CONFIG.get('username')
password_cfg = TEMPMAIL_CONFIG.get('password')
admin_token = TEMPMAIL_CONFIG.get('admin_token')
if not api_base_url:
print("❌ 错误: TEMPMAIL_CONFIG.api_base_url 未配置")
sys.exit(1)
# 初始化临时邮箱客户端
if username and password_cfg:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
username=username,
password=password_cfg
)
elif admin_token:
tempmail_client = TempMailClient(
api_base_url=api_base_url,
admin_token=admin_token
)
else:
print("❌ 错误: TEMPMAIL_CONFIG 未正确配置")
sys.exit(1)
# 创建任务队列和结果队列
task_queue = Queue()
result_queue = Queue()
# 创建工作线程
workers = []
for i in range(threads):
worker = RegistrationWorker(i + 1, task_queue, result_queue, tempmail_client)
worker.start()
workers.append(worker)
print(f"✅ 启动线程 #{i + 1}")
# 添加任务到队列
print(f"\n添加 {count} 个注册任务到队列...\n")
for i in range(1, count + 1):
task_password = password if password else generate_random_password()
task_queue.put({
'id': i,
'password': task_password
})
# 等待所有任务完成
task_queue.join()
# 发送结束信号
for _ in range(threads):
task_queue.put(None)
# 等待所有线程结束
for worker in workers:
worker.join()
# 收集结果
success_accounts = []
failed_accounts = []
while not result_queue.empty():
result = result_queue.get()
if result['success']:
success_accounts.append(result['data'])
else:
failed_accounts.append({
'task_id': result['task_id'],
'error': result['error'],
'email': result.get('email', 'N/A')
})
# 打印统计
print(f"\n{'='*60}")
print(f"注册完成")
print(f"{'='*60}")
print(f"总数: {count}")
print(f"成功: {len(success_accounts)}")
print(f"失败: {len(failed_accounts)}")
print(f"{'='*60}\n")
# 输出成功账号(格式: 账号|密码|token
if success_accounts:
print(f"{''*60}")
print(f"成功账号列表:")
print(f"{''*60}")
for idx, acc in enumerate(success_accounts, 1):
print(f"{idx}. {acc['account']}|{acc['password']}|{acc['token'][:50]}...")
print(f"{''*60}\n")
# 保存到 JSON 文件
if not output_file:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_file = f"registered_accounts_{timestamp}.json"
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(success_accounts, f, indent=2, ensure_ascii=False)
print(f"✅ 已保存 {len(success_accounts)} 个账号到: {output_file}\n")
# 打印失败账号
if failed_accounts:
print(f"{''*60}")
print(f"失败账号:")
print(f"{''*60}")
for fail in failed_accounts:
print(f"#{fail['task_id']}: {fail['email']} - {fail['error']}")
print(f"{''*60}\n")
def main():
parser = argparse.ArgumentParser(
description='多线程自动注册 OpenAI 账号',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
python multi_thread_register.py --count 10 --threads 3
python multi_thread_register.py -c 20 -t 5 -o accounts.json
python multi_thread_register.py -c 5 -t 2 --password MyPassword123
"""
)
parser.add_argument(
'-c', '--count',
type=int,
required=True,
help='注册账号数量'
)
parser.add_argument(
'-t', '--threads',
type=int,
default=3,
help='线程数(默认: 3'
)
parser.add_argument(
'-p', '--password',
type=str,
default=None,
help='统一密码(默认: 每个账号随机生成)'
)
parser.add_argument(
'-o', '--output',
type=str,
default=None,
help='输出 JSON 文件路径(默认: registered_accounts_<timestamp>.json'
)
args = parser.parse_args()
# 验证参数
if args.count <= 0:
print("❌ 错误: count 必须大于 0")
sys.exit(1)
if args.threads <= 0:
print("❌ 错误: threads 必须大于 0")
sys.exit(1)
# 执行多线程注册
multi_thread_register(
count=args.count,
threads=args.threads,
password=args.password,
output_file=args.output
)
if __name__ == '__main__':
main()