155 lines
3.8 KiB
Python
155 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
OpenAI 账号登录获取 Token
|
|
|
|
使用方法:
|
|
python login.py
|
|
python login.py email@example.com password123
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import json
|
|
from pathlib import Path
|
|
from datetime import datetime
|
|
|
|
from core.session import OAISession
|
|
from core.login_flow import LoginFlow
|
|
from config import load_config
|
|
from utils.logger import logger, setup_logger
|
|
|
|
|
|
async def login_and_get_token(email: str, password: str, proxy: str = None) -> dict:
|
|
"""
|
|
登录并获取 access_token
|
|
|
|
参数:
|
|
email: 登录邮箱
|
|
password: 登录密码
|
|
proxy: 代理地址(可选)
|
|
|
|
返回:
|
|
登录结果字典
|
|
"""
|
|
session = None
|
|
try:
|
|
# 创建会话
|
|
session = OAISession(proxy=proxy, impersonate="chrome124")
|
|
|
|
# 执行登录流程
|
|
flow = LoginFlow(session, email, password)
|
|
result = await flow.run()
|
|
|
|
return result
|
|
|
|
finally:
|
|
if session:
|
|
session.close()
|
|
|
|
|
|
def save_token(result: dict):
|
|
"""保存 token 到文件"""
|
|
if result.get("status") != "success":
|
|
return
|
|
|
|
email = result.get("email", "unknown")
|
|
access_token = result.get("access_token", "")
|
|
session_token = result.get("session_token", "")
|
|
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
# 保存到 accounts.json
|
|
json_file = Path("accounts.json")
|
|
accounts = []
|
|
if json_file.exists():
|
|
try:
|
|
with open(json_file, "r", encoding="utf-8") as f:
|
|
accounts = json.load(f)
|
|
except:
|
|
accounts = []
|
|
|
|
# 查找是否已存在该邮箱,更新 token
|
|
found = False
|
|
for acc in accounts:
|
|
if acc.get("email") == email:
|
|
acc["access_token"] = access_token
|
|
acc["session_token"] = session_token
|
|
acc["token_updated"] = timestamp
|
|
found = True
|
|
break
|
|
|
|
if not found:
|
|
accounts.append({
|
|
"email": email,
|
|
"access_token": access_token,
|
|
"session_token": session_token,
|
|
"timestamp": timestamp
|
|
})
|
|
|
|
with open(json_file, "w", encoding="utf-8") as f:
|
|
json.dump(accounts, f, indent=2, ensure_ascii=False)
|
|
|
|
logger.info(f"Token saved to accounts.json")
|
|
|
|
# 同时保存到单独文件
|
|
token_dir = Path("tokens")
|
|
token_dir.mkdir(exist_ok=True)
|
|
token_file = token_dir / f"{email.replace('@', '_at_')}.txt"
|
|
with open(token_file, "w", encoding="utf-8") as f:
|
|
f.write(f"Email: {email}\n")
|
|
f.write(f"Access Token: {access_token}\n")
|
|
f.write(f"Session Token: {session_token}\n")
|
|
f.write(f"Timestamp: {timestamp}\n")
|
|
|
|
logger.info(f"Token saved to {token_file}")
|
|
|
|
|
|
async def main():
|
|
# 初始化日志
|
|
setup_logger()
|
|
|
|
# 加载配置
|
|
config = load_config()
|
|
|
|
# 获取邮箱和密码
|
|
if len(sys.argv) >= 3:
|
|
email = sys.argv[1]
|
|
password = sys.argv[2]
|
|
else:
|
|
print("\n=== OpenAI 账号登录 ===\n")
|
|
email = input("Email: ").strip()
|
|
password = input("Password: ").strip()
|
|
|
|
if not email or not password:
|
|
logger.error("Email and password are required")
|
|
return
|
|
|
|
# 获取代理
|
|
proxy = config.proxy.get_next_proxy()
|
|
if proxy:
|
|
logger.info(f"Using proxy: {proxy[:30]}...")
|
|
|
|
print()
|
|
logger.info(f"Starting login for {email}")
|
|
print()
|
|
|
|
# 执行登录
|
|
result = await login_and_get_token(email, password, proxy)
|
|
|
|
print()
|
|
if result.get("status") == "success":
|
|
logger.success(f"✅ Login successful!")
|
|
print()
|
|
print(f"Access Token: {result['access_token'][:80]}...")
|
|
print()
|
|
|
|
# 保存 token
|
|
save_token(result)
|
|
else:
|
|
logger.error(f"❌ Login failed: {result.get('error', 'Unknown error')}")
|
|
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|