Files
autoPlus/main.py
2026-01-29 22:41:16 +08:00

735 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
OpenAI 账号自动注册系统 - 主程序入口
功能:
- 异步并发执行多个注册任务
- 账号登录获取 access_token
- 代理池轮换
- 结果保存和统计
- 错误日志记录
- 失败重试机制
使用方法:
python main.py
"""
import asyncio
from pathlib import Path
from typing import List, Dict, Any
import time
from datetime import datetime
from core.session import OAISession
from core.flow import RegisterFlow
from core.checkout import CheckoutFlow
from config import load_config
from utils.logger import logger, setup_logger
import random
import json
async def login_account(
config,
email: str,
password: str,
task_id: int = 1
) -> Dict[str, Any]:
"""
单个账号登录任务
参数:
config: AppConfig 配置对象
email: 登录邮箱
password: 登录密码
task_id: 任务 ID用于日志标识
返回:
登录结果字典
"""
# 选择代理
proxy = config.proxy.get_next_proxy()
if proxy:
logger.info(f"[Login {task_id}] Using proxy: {_mask_proxy(proxy)}")
else:
logger.info(f"[Login {task_id}] No proxy configured, using direct connection")
session = None
try:
session = OAISession(
proxy=proxy,
impersonate=config.tls_impersonate
)
logger.info(f"[Login {task_id}] Starting login for {email}")
result = await session.login(email, password)
result["task_id"] = task_id
result["proxy"] = _mask_proxy(proxy) if proxy else "none"
if result["status"] == "success":
logger.success(
f"[Login {task_id}] ✅ Login successful: {email}"
)
# 保存 token 到文件
await save_token(result, config)
else:
logger.error(
f"[Login {task_id}] ❌ Login failed: {result.get('error', 'Unknown error')}"
)
return result
except Exception as e:
logger.exception(f"[Login {task_id}] Unexpected error in login task")
return {
"task_id": task_id,
"email": email,
"status": "failed",
"error": str(e),
"message": f"Task exception: {type(e).__name__}"
}
finally:
if session:
try:
session.close()
except Exception as e:
logger.warning(f"[Login {task_id}] Error closing session: {e}")
async def get_checkout_session(
config,
email: str,
password: str,
plan_name: str = "chatgptplusplan",
task_id: int = 1
) -> Dict[str, Any]:
"""
获取账号的支付账单链接
参数:
config: AppConfig 配置对象
email: 登录邮箱
password: 登录密码
plan_name: 订阅计划名称 (chatgptplusplan/chatgptproplan)
task_id: 任务 ID用于日志标识
返回:
支付会话结果字典
"""
proxy = config.proxy.get_next_proxy()
if proxy:
logger.info(f"[Checkout {task_id}] Using proxy: {_mask_proxy(proxy)}")
else:
logger.info(f"[Checkout {task_id}] No proxy configured, using direct connection")
session = None
try:
session = OAISession(
proxy=proxy,
impersonate=config.tls_impersonate
)
# 先登录
logger.info(f"[Checkout {task_id}] Logging in as {email}...")
login_result = await session.login(email, password)
if login_result.get("status") != "success":
logger.error(f"[Checkout {task_id}] ❌ Login failed: {login_result.get('error')}")
return {
"task_id": task_id,
"email": email,
"status": "failed",
"error": f"Login failed: {login_result.get('error')}"
}
# 获取支付会话
logger.info(f"[Checkout {task_id}] Creating checkout session for {plan_name}...")
checkout = CheckoutFlow(session, plan_name=plan_name)
result = await checkout.create_checkout_session()
result["task_id"] = task_id
result["email"] = email
result["proxy"] = _mask_proxy(proxy) if proxy else "none"
if result.get("status") == "success":
logger.success(
f"[Checkout {task_id}] ✅ Checkout session created for {email}"
)
# 保存结果
await save_checkout_result(result)
else:
logger.error(
f"[Checkout {task_id}] ❌ Checkout failed: {result.get('error', 'Unknown error')}"
)
return result
except Exception as e:
logger.exception(f"[Checkout {task_id}] Unexpected error")
return {
"task_id": task_id,
"email": email,
"status": "failed",
"error": str(e),
"message": f"Task exception: {type(e).__name__}"
}
finally:
if session:
try:
session.close()
except Exception as e:
logger.warning(f"[Checkout {task_id}] Error closing session: {e}")
async def save_checkout_result(result: Dict[str, Any]):
"""
保存支付会话结果到文件
参数:
result: 支付会话结果字典
"""
output_file = Path("output/checkouts.json")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 读取现有数据
checkouts = []
if output_file.exists():
try:
with open(output_file, "r", encoding="utf-8") as f:
checkouts = json.load(f)
except (json.JSONDecodeError, Exception):
checkouts = []
# 添加新记录
checkout_data = {
"email": result.get("email", "unknown"),
"plan_name": result.get("plan_name"),
"checkout_session_id": result.get("checkout_session_id"),
"client_secret": result.get("client_secret"),
"publishable_key": result.get("publishable_key"),
"payment_status": result.get("payment_status"),
"url": result.get("url"),
"timestamp": timestamp
}
checkouts.append(checkout_data)
# 写入文件
async with asyncio.Lock():
with open(output_file, "w", encoding="utf-8") as f:
json.dump(checkouts, f, indent=2, ensure_ascii=False)
logger.info(f"Checkout saved to checkouts.json: {result.get('email')}")
async def run_batch_checkout(config, accounts: List[Dict], plan_name: str = "chatgptplusplan"):
"""
批量获取账号的支付链接
参数:
config: AppConfig 配置对象
accounts: 账号列表 [{"email": "...", "password": "..."}, ...]
plan_name: 订阅计划
返回:
结果列表
"""
logger.info(f"Starting batch checkout: {len(accounts)} accounts")
all_results = []
for idx, acc in enumerate(accounts, 1):
email = acc.get("email", "")
password = acc.get("password", "")
if not email or not password:
logger.warning(f"[Checkout {idx}] Skipping - missing email or password")
continue
result = await get_checkout_session(
config,
email=email,
password=password,
plan_name=plan_name,
task_id=idx
)
all_results.append(result)
# 延迟避免速率限制
if idx < len(accounts):
delay = random.uniform(2, 4)
logger.info(f"Waiting {delay:.1f}s before next checkout...")
await asyncio.sleep(delay)
return all_results
async def save_token(result: Dict[str, Any], config):
"""
保存登录 token 到文件
参数:
result: 登录结果字典
config: 配置对象
"""
output_path = Path("output/tokens.txt")
output_path.parent.mkdir(parents=True, exist_ok=True)
email = result.get("email", "unknown")
access_token = result.get("access_token", "")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 格式: email | access_token | timestamp
line = f"{email} | {access_token[:50]}... | {timestamp}\n"
async with asyncio.Lock():
with open(output_path, "a", encoding="utf-8") as f:
f.write(line)
# 同时保存完整 token 到单独文件
token_file = Path(f"output/tokens/{email.replace('@', '_at_')}.txt")
token_file.parent.mkdir(parents=True, exist_ok=True)
with open(token_file, "w", encoding="utf-8") as f:
f.write(f"Email: {email}\n")
f.write(f"Access Token: {access_token}\n")
f.write(f"Session Token: {result.get('session_token', 'N/A')}\n")
f.write(f"Timestamp: {timestamp}\n")
logger.debug(f"Token saved for {email}")
async def register_account(
config,
task_id: int,
retry_count: int = 0
) -> Dict[str, Any]:
"""
单个账号注册任务
参数:
config: AppConfig 配置对象
task_id: 任务 ID用于日志标识
retry_count: 当前重试次数
返回:
注册结果字典
"""
# 选择代理
proxy = config.proxy.get_next_proxy()
if proxy:
logger.info(f"[Task {task_id}] Using proxy: {_mask_proxy(proxy)}")
else:
logger.info(f"[Task {task_id}] No proxy configured, using direct connection")
# 创建会话(使用 with 语句自动清理资源)
session = None
try:
session = OAISession(
proxy=proxy,
impersonate=config.tls_impersonate
)
# 创建注册流程
flow = RegisterFlow(session, config)
# 执行注册
logger.info(f"[Task {task_id}] Starting registration for {flow.email}")
result = await flow.run()
# 添加任务信息
result["task_id"] = task_id
result["retry_count"] = retry_count
result["proxy"] = _mask_proxy(proxy) if proxy else "none"
# 保存成功的账号
if result["status"] == "success":
# 注册成功后,立即登录获取 token
logger.info(f"[Task {task_id}] Registration done, now logging in to get token...")
from core.login_flow import LoginFlow
login_flow = LoginFlow(session, result["email"], result["password"])
login_result = await login_flow.run()
if login_result.get("status") == "success":
result["access_token"] = login_result.get("access_token")
result["session_token"] = login_result.get("session_token")
# 更新 session 对象的 token供 CheckoutFlow 使用)
session.access_token = login_result.get("access_token")
session.session_token = login_result.get("session_token")
logger.success(f"[Task {task_id}] ✅ Token obtained!")
# 获取支付链接
logger.info(f"[Task {task_id}] Getting checkout session...")
checkout = CheckoutFlow(session, plan_name="chatgptplusplan")
checkout_result = await checkout.create_checkout_session()
if checkout_result.get("status") == "success":
result["checkout_session_id"] = checkout_result.get("checkout_session_id")
result["client_secret"] = checkout_result.get("client_secret")
result["publishable_key"] = checkout_result.get("publishable_key")
result["payment_status"] = checkout_result.get("payment_status")
logger.success(f"[Task {task_id}] ✅ Checkout session created!")
# 保存 checkout 结果
checkout_result["email"] = result["email"]
await save_checkout_result(checkout_result)
else:
logger.warning(f"[Task {task_id}] ⚠️ Checkout failed: {checkout_result.get('error')}")
else:
logger.warning(f"[Task {task_id}] ⚠️ Login failed: {login_result.get('error')}")
await save_account(result, config.accounts_output_file)
logger.success(
f"[Task {task_id}] ✅ Account created: {result['email']}:{result['password']}"
)
elif result["status"] == "pending_manual":
logger.warning(
f"[Task {task_id}] ⚠️ Manual intervention required for {result['email']}"
)
# 也保存部分成功的账号(用户可以手动完成)
await save_account(result, "accounts_pending.txt")
else:
logger.error(
f"[Task {task_id}] ❌ Registration failed: {result.get('error', 'Unknown error')}"
)
return result
except Exception as e:
logger.exception(f"[Task {task_id}] Unexpected error in registration task")
return {
"task_id": task_id,
"retry_count": retry_count,
"status": "failed",
"error": str(e),
"message": f"Task exception: {type(e).__name__}"
}
finally:
if session:
try:
session.close()
except Exception as e:
logger.warning(f"[Task {task_id}] Error closing session: {e}")
async def save_account(result: Dict[str, Any], output_file: str):
"""
保存账号信息到文件
参数:
result: 注册结果字典
output_file: 输出文件路径
"""
import json
# 确保目录存在
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
# 构建保存内容
email = result.get("email", "unknown")
password = result.get("password", "unknown")
access_token = result.get("access_token", "")
status = result.get("status", "unknown")
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 保存到 JSON 文件
json_file = Path("output/accounts.json")
# 读取现有数据
accounts = []
if json_file.exists():
try:
with open(json_file, "r", encoding="utf-8") as f:
accounts = json.load(f)
except (json.JSONDecodeError, Exception):
accounts = []
# 添加新账号
account_data = {
"email": email,
"password": password,
"access_token": access_token,
"oai_did": result.get("oai_did", ""),
"status": status,
"timestamp": timestamp
}
accounts.append(account_data)
# 写入 JSON
async with asyncio.Lock():
with open(json_file, "w", encoding="utf-8") as f:
json.dump(accounts, f, indent=2, ensure_ascii=False)
# 同时保存到 txt兼容旧格式
line = f"{email}:{password} | {status} | {timestamp}\n"
with open(output_path, "a", encoding="utf-8") as f:
f.write(line)
logger.info(f"Account saved to accounts.json: {email}")
logger.debug(f"Account saved to {output_file}: {email}")
async def run_batch_registration(config, num_accounts: int) -> List[Dict[str, Any]]:
"""
批量注册账号(带重试机制)
参数:
config: AppConfig 配置对象
num_accounts: 要注册的账号数量
返回:
注册结果列表
"""
logger.info(f"Starting batch registration: {num_accounts} accounts")
logger.info(f"Max workers: {config.max_workers}")
logger.info(f"Retry limit: {config.retry_limit}")
# 创建任务列表
tasks = []
for i in range(num_accounts):
task = register_account_with_retry(config, task_id=i + 1)
tasks.append(task)
# 限制并发数量
if len(tasks) >= config.max_workers:
# 等待一批任务完成
results = await asyncio.gather(*tasks, return_exceptions=True)
tasks = []
# 短暂延迟,避免速率限制
await asyncio.sleep(random.uniform(1, 3))
# 执行剩余任务
if tasks:
remaining_results = await asyncio.gather(*tasks, return_exceptions=True)
logger.info("Batch registration completed")
async def register_account_with_retry(
config,
task_id: int
) -> Dict[str, Any]:
"""
带重试机制的账号注册
参数:
config: AppConfig 配置对象
task_id: 任务 ID
返回:
注册结果字典
"""
for retry_count in range(config.retry_limit + 1):
try:
result = await register_account(config, task_id, retry_count)
# 检查是否需要重试
should_retry = (
result["status"] in ["session_invalid", "rate_limited", "failed"]
and retry_count < config.retry_limit
)
if should_retry:
logger.warning(
f"[Task {task_id}] Retry {retry_count + 1}/{config.retry_limit} "
f"after {config.retry_delay}s delay"
)
await asyncio.sleep(config.retry_delay)
continue
else:
return result
except Exception as e:
logger.error(f"[Task {task_id}] Error in retry loop: {e}")
if retry_count < config.retry_limit:
await asyncio.sleep(config.retry_delay)
continue
else:
return {
"task_id": task_id,
"status": "failed",
"error": str(e)
}
# 不应该到达这里
return {"task_id": task_id, "status": "failed", "error": "Max retries exceeded"}
async def main():
"""
主函数
执行流程:
1. 加载配置
2. 验证配置
3. 执行注册任务
4. 统计结果
"""
print("=" * 70)
print(" OpenAI 账号自动注册系统")
print(" Version: 0.1.0")
print("=" * 70)
print()
# 加载配置
config = load_config()
# 设置日志级别
setup_logger(config.log_level)
# 打印配置摘要
config.print_summary()
# 验证配置
warnings = config.validate_config()
if warnings:
logger.warning("Configuration warnings detected:")
for warning in warnings:
logger.warning(f" ⚠️ {warning}")
print()
# 询问是否继续
user_input = input("Continue anyway? (y/N): ").strip().lower()
if user_input != "y":
logger.info("Aborted by user")
return
# 确保必要的目录存在
Path("output/logs").mkdir(parents=True, exist_ok=True)
Path(config.accounts_output_file).parent.mkdir(parents=True, exist_ok=True)
# 询问要注册的账号数量
print()
try:
num_accounts = int(input("How many accounts to register? [default: 1]: ").strip() or "1")
if num_accounts < 1:
logger.error("Number of accounts must be at least 1")
return
except ValueError:
logger.error("Invalid number")
return
print()
logger.info(f"Will register {num_accounts} account(s)")
logger.info(f"Output file: {config.accounts_output_file}")
print()
# 开始注册
start_time = time.time()
# 创建任务并控制并发
all_results = []
task_id = 1
while task_id <= num_accounts:
# 创建一批任务(最多 max_workers 个)
batch_size = min(config.max_workers, num_accounts - task_id + 1)
tasks = [
register_account_with_retry(config, task_id=task_id + i)
for i in range(batch_size)
]
# 执行这批任务
logger.info(f"Executing batch: tasks {task_id} to {task_id + batch_size - 1}")
batch_results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理结果
for result in batch_results:
if isinstance(result, Exception):
logger.error(f"Task raised exception: {result}")
all_results.append({
"status": "exception",
"error": str(result)
})
else:
all_results.append(result)
task_id += batch_size
# 批次间延迟(避免速率限制)
if task_id <= num_accounts:
delay = random.uniform(2, 5)
logger.info(f"Waiting {delay:.1f}s before next batch...")
await asyncio.sleep(delay)
# 计算耗时
elapsed_time = time.time() - start_time
# 统计结果
print()
print("=" * 70)
print(" Registration Summary")
print("=" * 70)
status_counts = {}
for result in all_results:
status = result.get("status", "unknown")
status_counts[status] = status_counts.get(status, 0) + 1
total = len(all_results)
success = status_counts.get("success", 0)
pending = status_counts.get("pending_manual", 0)
failed = total - success - pending
print(f"Total tasks: {total}")
print(f"✅ Success: {success} ({success/total*100:.1f}%)")
print(f"⚠️ Pending manual: {pending} ({pending/total*100:.1f}%)")
print(f"❌ Failed: {failed} ({failed/total*100:.1f}%)")
print()
print("Status breakdown:")
for status, count in sorted(status_counts.items()):
print(f" - {status}: {count}")
print()
print(f"Time elapsed: {elapsed_time:.1f}s")
print(f"Average time per account: {elapsed_time/total:.1f}s")
print("=" * 70)
# 保存详细结果到 JSON可选
if config.log_to_file:
import json
result_file = f"output/logs/results_{int(time.time())}.json"
with open(result_file, "w", encoding="utf-8") as f:
json.dump(all_results, f, indent=2, ensure_ascii=False)
logger.info(f"Detailed results saved to: {result_file}")
print()
if success > 0:
logger.success(f"{success} account(s) saved to: {config.accounts_output_file}")
if pending > 0:
logger.warning(f"⚠️ {pending} account(s) need manual completion: accounts_pending.txt")
print()
logger.info("Program finished")
def _mask_proxy(proxy: str) -> str:
"""
脱敏代理地址(隐藏用户名和密码)
例如: http://user:pass@1.2.3.4:8080 -> http://***:***@1.2.3.4:8080
"""
import re
return re.sub(r'://([^:]+):([^@]+)@', r'://***:***@', proxy)
if __name__ == "__main__":
try:
# 运行主程序
asyncio.run(main())
except KeyboardInterrupt:
print()
logger.warning("⚠️ Interrupted by user (Ctrl+C)")
print()
except Exception as e:
logger.exception("Fatal error in main program")
print()
print(f"❌ Fatal error: {e}")
exit(1)