#!/usr/bin/env python3 """ OpenAI 账号登录获取 Token 使用方法: python login.py python login.py email@example.com password123 """ import asyncio import sys import json from pathlib import Path from datetime import datetime from core.session import OAISession from core.login_flow import LoginFlow from config import load_config from utils.logger import logger, setup_logger async def login_and_get_token(email: str, password: str, proxy: str = None) -> dict: """ 登录并获取 access_token 参数: email: 登录邮箱 password: 登录密码 proxy: 代理地址(可选) 返回: 登录结果字典 """ session = None try: # 创建会话 session = OAISession(proxy=proxy, impersonate="chrome124") # 执行登录流程 flow = LoginFlow(session, email, password) result = await flow.run() return result finally: if session: session.close() def save_token(result: dict): """保存 token 到文件""" if result.get("status") != "success": return email = result.get("email", "unknown") access_token = result.get("access_token", "") session_token = result.get("session_token", "") timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 保存到 accounts.json json_file = Path("accounts.json") accounts = [] if json_file.exists(): try: with open(json_file, "r", encoding="utf-8") as f: accounts = json.load(f) except: accounts = [] # 查找是否已存在该邮箱,更新 token found = False for acc in accounts: if acc.get("email") == email: acc["access_token"] = access_token acc["session_token"] = session_token acc["token_updated"] = timestamp found = True break if not found: accounts.append({ "email": email, "access_token": access_token, "session_token": session_token, "timestamp": timestamp }) with open(json_file, "w", encoding="utf-8") as f: json.dump(accounts, f, indent=2, ensure_ascii=False) logger.info(f"Token saved to accounts.json") # 同时保存到单独文件 token_dir = Path("tokens") token_dir.mkdir(exist_ok=True) token_file = token_dir / f"{email.replace('@', '_at_')}.txt" with open(token_file, "w", encoding="utf-8") as f: f.write(f"Email: {email}\n") f.write(f"Access Token: {access_token}\n") f.write(f"Session Token: {session_token}\n") f.write(f"Timestamp: {timestamp}\n") logger.info(f"Token saved to {token_file}") async def main(): # 初始化日志 setup_logger() # 加载配置 config = load_config() # 获取邮箱和密码 if len(sys.argv) >= 3: email = sys.argv[1] password = sys.argv[2] else: print("\n=== OpenAI 账号登录 ===\n") email = input("Email: ").strip() password = input("Password: ").strip() if not email or not password: logger.error("Email and password are required") return # 获取代理 proxy = config.proxy.get_next_proxy() if proxy: logger.info(f"Using proxy: {proxy[:30]}...") print() logger.info(f"Starting login for {email}") print() # 执行登录 result = await login_and_get_token(email, password, proxy) print() if result.get("status") == "success": logger.success(f"✅ Login successful!") print() print(f"Access Token: {result['access_token'][:80]}...") print() # 保存 token save_token(result) else: logger.error(f"❌ Login failed: {result.get('error', 'Unknown error')}") print() if __name__ == "__main__": asyncio.run(main())