140 lines
3.6 KiB
Python
140 lines
3.6 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
批量获取 OpenAI token
|
||
读取 needtoken.json,使用 flow_email_token.py 登录获取 token,保存为指定格式
|
||
"""
|
||
|
||
import json
|
||
import sys
|
||
import os
|
||
import time
|
||
from pathlib import Path
|
||
|
||
# 添加项目根目录到 Python 路径
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from flow.flow_email_token import OpenAILogin
|
||
|
||
|
||
def load_accounts(json_path: str):
|
||
"""加载账号列表"""
|
||
with open(json_path, 'r', encoding='utf-8') as f:
|
||
return json.load(f)
|
||
|
||
|
||
def save_results(results: list, output_path: str):
|
||
"""保存结果到文件"""
|
||
with open(output_path, 'w', encoding='utf-8') as f:
|
||
# 写入表头
|
||
f.write("账号 | 密码 | token\n")
|
||
f.write("-" * 100 + "\n")
|
||
|
||
# 写入数据
|
||
for item in results:
|
||
email = item['email']
|
||
password = item['password']
|
||
token = item.get('token', 'FAILED')
|
||
f.write(f"{email} | {password} | {token}\n")
|
||
|
||
print(f"\n✅ 结果已保存到: {output_path}")
|
||
|
||
|
||
def batch_get_tokens(json_path: str, output_path: str, delay: int = 5):
|
||
"""批量获取 token"""
|
||
|
||
print(f"{'='*80}")
|
||
print(f"批量获取 OpenAI Token")
|
||
print(f"{'='*80}\n")
|
||
|
||
# 加载账号列表
|
||
print(f"📂 加载账号列表: {json_path}")
|
||
accounts = load_accounts(json_path)
|
||
total = len(accounts)
|
||
print(f" 找到 {total} 个账号\n")
|
||
|
||
# 结果列表
|
||
results = []
|
||
success_count = 0
|
||
failed_count = 0
|
||
|
||
# 逐个登录获取 token
|
||
for idx, account in enumerate(accounts, 1):
|
||
email = account['email']
|
||
password = account['password']
|
||
|
||
print(f"\n{'='*80}")
|
||
print(f"[{idx}/{total}] 正在处理: {email}")
|
||
print(f"{'='*80}")
|
||
|
||
try:
|
||
# 创建登录客户端
|
||
client = OpenAILogin()
|
||
|
||
# 执行登录
|
||
access_token = client.login(email, password)
|
||
|
||
if access_token:
|
||
print(f"\n✅ [{idx}/{total}] 成功获取 token")
|
||
results.append({
|
||
'email': email,
|
||
'password': password,
|
||
'token': access_token
|
||
})
|
||
success_count += 1
|
||
else:
|
||
print(f"\n❌ [{idx}/{total}] 登录失败")
|
||
results.append({
|
||
'email': email,
|
||
'password': password,
|
||
'token': 'FAILED'
|
||
})
|
||
failed_count += 1
|
||
|
||
except Exception as e:
|
||
print(f"\n❌ [{idx}/{total}] 异常: {e}")
|
||
results.append({
|
||
'email': email,
|
||
'password': password,
|
||
'token': f'ERROR: {str(e)}'
|
||
})
|
||
failed_count += 1
|
||
|
||
# 延迟(避免频繁请求)
|
||
if idx < total:
|
||
print(f"\n⏳ 等待 {delay} 秒后继续...")
|
||
time.sleep(delay)
|
||
|
||
# 保存结果
|
||
print(f"\n{'='*80}")
|
||
print(f"批量处理完成")
|
||
print(f"{'='*80}")
|
||
print(f"总数: {total}")
|
||
print(f"成功: {success_count}")
|
||
print(f"失败: {failed_count}")
|
||
print(f"{'='*80}\n")
|
||
|
||
save_results(results, output_path)
|
||
|
||
return results
|
||
|
||
|
||
def main():
|
||
"""主函数"""
|
||
|
||
# 默认路径
|
||
json_path = "needtoken.json"
|
||
output_path = "tokens_result.txt"
|
||
delay = 5 # 每个账号之间的延迟(秒)
|
||
|
||
# 检查输入文件是否存在
|
||
if not os.path.exists(json_path):
|
||
print(f"❌ 文件不存在: {json_path}")
|
||
return
|
||
|
||
# 执行批量获取
|
||
batch_get_tokens(json_path, output_path, delay)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|