Files
autoPlus/main.py
2026-01-26 15:04:02 +08:00

398 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
OpenAI 账号自动注册系统 - 主程序入口
功能:
- 异步并发执行多个注册任务
- 代理池轮换
- 结果保存和统计
- 错误日志记录
- 失败重试机制
使用方法:
python main.py
"""
import asyncio
from pathlib import Path
from typing import List, Dict, Any
import time
from datetime import datetime
from core.session import OAISession
from core.flow import RegisterFlow
from config import load_config
from utils.logger import logger, setup_logger
import random
async def register_account(
config,
task_id: int,
retry_count: int = 0
) -> Dict[str, Any]:
"""
单个账号注册任务
参数:
config: AppConfig 配置对象
task_id: 任务 ID用于日志标识
retry_count: 当前重试次数
返回:
注册结果字典
"""
# 选择代理
proxy = config.proxy.get_next_proxy()
if proxy:
logger.info(f"[Task {task_id}] Using proxy: {_mask_proxy(proxy)}")
else:
logger.info(f"[Task {task_id}] No proxy configured, using direct connection")
# 创建会话(使用 with 语句自动清理资源)
session = None
try:
session = OAISession(
proxy=proxy,
impersonate=config.tls_impersonate
)
# 创建注册流程
flow = RegisterFlow(session, config)
# 执行注册
logger.info(f"[Task {task_id}] Starting registration for {flow.email}")
result = await flow.run()
# 添加任务信息
result["task_id"] = task_id
result["retry_count"] = retry_count
result["proxy"] = _mask_proxy(proxy) if proxy else "none"
# 保存成功的账号
if result["status"] == "success":
await save_account(result, config.accounts_output_file)
logger.success(
f"[Task {task_id}] ✅ Account created: {result['email']}:{result['password']}"
)
elif result["status"] == "pending_manual":
logger.warning(
f"[Task {task_id}] ⚠️ Manual intervention required for {result['email']}"
)
# 也保存部分成功的账号(用户可以手动完成)
await save_account(result, "accounts_pending.txt")
else:
logger.error(
f"[Task {task_id}] ❌ Registration failed: {result.get('error', 'Unknown error')}"
)
return result
except Exception as e:
logger.exception(f"[Task {task_id}] Unexpected error in registration task")
return {
"task_id": task_id,
"retry_count": retry_count,
"status": "failed",
"error": str(e),
"message": f"Task exception: {type(e).__name__}"
}
finally:
# 清理会话资源
if session:
try:
session.close()
except Exception as e:
logger.warning(f"[Task {task_id}] Error closing session: {e}")
async def save_account(result: Dict[str, Any], output_file: str):
"""
保存账号信息到文件
参数:
result: 注册结果字典
output_file: 输出文件路径
"""
# 确保目录存在
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 构建保存内容
email = result.get("email", "unknown")
password = result.get("password", "unknown")
status = result.get("status", "unknown")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 格式: email:password | status | timestamp
line = f"{email}:{password} | {status} | {timestamp}\n"
# 异步写入文件(避免阻塞)
async with asyncio.Lock():
with open(output_path, "a", encoding="utf-8") as f:
f.write(line)
logger.debug(f"Account saved to {output_file}: {email}")
async def run_batch_registration(config, num_accounts: int) -> List[Dict[str, Any]]:
"""
批量注册账号(带重试机制)
参数:
config: AppConfig 配置对象
num_accounts: 要注册的账号数量
返回:
注册结果列表
"""
logger.info(f"Starting batch registration: {num_accounts} accounts")
logger.info(f"Max workers: {config.max_workers}")
logger.info(f"Retry limit: {config.retry_limit}")
# 创建任务列表
tasks = []
for i in range(num_accounts):
task = register_account_with_retry(config, task_id=i + 1)
tasks.append(task)
# 限制并发数量
if len(tasks) >= config.max_workers:
# 等待一批任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
tasks = []
# 短暂延迟,避免速率限制
await asyncio.sleep(random.uniform(1, 3))
# 执行剩余任务
if tasks:
remaining_results = await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Batch registration completed")
async def register_account_with_retry(
config,
task_id: int
) -> Dict[str, Any]:
"""
带重试机制的账号注册
参数:
config: AppConfig 配置对象
task_id: 任务 ID
返回:
注册结果字典
"""
for retry_count in range(config.retry_limit + 1):
try:
result = await register_account(config, task_id, retry_count)
# 检查是否需要重试
should_retry = (
result["status"] in ["session_invalid", "rate_limited", "failed"]
and retry_count < config.retry_limit
)
if should_retry:
logger.warning(
f"[Task {task_id}] Retry {retry_count + 1}/{config.retry_limit} "
f"after {config.retry_delay}s delay"
)
await asyncio.sleep(config.retry_delay)
continue
else:
return result
except Exception as e:
logger.error(f"[Task {task_id}] Error in retry loop: {e}")
if retry_count < config.retry_limit:
await asyncio.sleep(config.retry_delay)
continue
else:
return {
"task_id": task_id,
"status": "failed",
"error": str(e)
}
# 不应该到达这里
return {"task_id": task_id, "status": "failed", "error": "Max retries exceeded"}
async def main():
"""
主函数
执行流程:
1. 加载配置
2. 验证配置
3. 执行注册任务
4. 统计结果
"""
print("=" * 70)
print(" OpenAI 账号自动注册系统")
print(" Version: 0.1.0")
print("=" * 70)
print()
# 加载配置
config = load_config()
# 设置日志级别
setup_logger(config.log_level)
# 打印配置摘要
config.print_summary()
# 验证配置
warnings = config.validate_config()
if warnings:
logger.warning("Configuration warnings detected:")
for warning in warnings:
logger.warning(f" ⚠️ {warning}")
print()
# 询问是否继续
user_input = input("Continue anyway? (y/N): ").strip().lower()
if user_input != "y":
logger.info("Aborted by user")
return
# 确保必要的目录存在
Path("logs").mkdir(exist_ok=True)
Path(config.accounts_output_file).parent.mkdir(parents=True, exist_ok=True)
# 询问要注册的账号数量
print()
try:
num_accounts = int(input("How many accounts to register? [default: 1]: ").strip() or "1")
if num_accounts < 1:
logger.error("Number of accounts must be at least 1")
return
except ValueError:
logger.error("Invalid number")
return
print()
logger.info(f"Will register {num_accounts} account(s)")
logger.info(f"Output file: {config.accounts_output_file}")
print()
# 开始注册
start_time = time.time()
# 创建任务并控制并发
all_results = []
task_id = 1
while task_id <= num_accounts:
# 创建一批任务(最多 max_workers 个)
batch_size = min(config.max_workers, num_accounts - task_id + 1)
tasks = [
register_account_with_retry(config, task_id=task_id + i)
for i in range(batch_size)
]
# 执行这批任务
logger.info(f"Executing batch: tasks {task_id} to {task_id + batch_size - 1}")
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for result in batch_results:
if isinstance(result, Exception):
logger.error(f"Task raised exception: {result}")
all_results.append({
"status": "exception",
"error": str(result)
})
else:
all_results.append(result)
task_id += batch_size
# 批次间延迟(避免速率限制)
if task_id <= num_accounts:
delay = random.uniform(2, 5)
logger.info(f"Waiting {delay:.1f}s before next batch...")
await asyncio.sleep(delay)
# 计算耗时
elapsed_time = time.time() - start_time
# 统计结果
print()
print("=" * 70)
print(" Registration Summary")
print("=" * 70)
status_counts = {}
for result in all_results:
status = result.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
total = len(all_results)
success = status_counts.get("success", 0)
pending = status_counts.get("pending_manual", 0)
failed = total - success - pending
print(f"Total tasks: {total}")
print(f"✅ Success: {success} ({success/total*100:.1f}%)")
print(f"⚠️ Pending manual: {pending} ({pending/total*100:.1f}%)")
print(f"❌ Failed: {failed} ({failed/total*100:.1f}%)")
print()
print("Status breakdown:")
for status, count in sorted(status_counts.items()):
print(f" - {status}: {count}")
print()
print(f"Time elapsed: {elapsed_time:.1f}s")
print(f"Average time per account: {elapsed_time/total:.1f}s")
print("=" * 70)
# 保存详细结果到 JSON可选
if config.log_to_file:
import json
result_file = f"logs/results_{int(time.time())}.json"
with open(result_file, "w", encoding="utf-8") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
logger.info(f"Detailed results saved to: {result_file}")
print()
if success > 0:
logger.success(f"{success} account(s) saved to: {config.accounts_output_file}")
if pending > 0:
logger.warning(f"⚠️ {pending} account(s) need manual completion: accounts_pending.txt")
print()
logger.info("Program finished")
def _mask_proxy(proxy: str) -> str:
"""
脱敏代理地址(隐藏用户名和密码)
例如: http://user:pass@1.2.3.4:8080 -> http://***:***@1.2.3.4:8080
"""
import re
return re.sub(r'://([^:]+):([^@]+)@', r'://***:***@', proxy)
if __name__ == "__main__":
try:
# 运行主程序
asyncio.run(main())
except KeyboardInterrupt:
print()
logger.warning("⚠️ Interrupted by user (Ctrl+C)")
print()
except Exception as e:
logger.exception("Fatal error in main program")
print()
print(f"❌ Fatal error: {e}")
exit(1)