Files
codexTool/auto_gpt_team.py
2026-01-27 09:25:04 +08:00

2514 lines
94 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Author: muyyg
Project: Subscription Automation (DrissionPage Version)
Created: 2026-01-12
Version: 3.1-hybrid (支持协议模式)
模式说明:
- browser: 浏览器自动化模式 (默认),全程使用 DrissionPage 浏览器自动化
- api: 协议模式,使用 API 快速完成注册,仅支付环节使用浏览器
"""
import time
import random
import string
import re
import sys
import os
import platform
import subprocess
import tempfile
import requests
from pathlib import Path
from DrissionPage import ChromiumPage, ChromiumOptions
# 导入协议模式模块
try:
from api_register import (
ChatGPTAPIRegister,
api_register_flow,
api_login_flow,
is_api_mode_available,
get_verification_code_api,
ShutdownRequested,
)
API_MODE_AVAILABLE = is_api_mode_available()
except ImportError:
API_MODE_AVAILABLE = False
ChatGPTAPIRegister = None
api_register_flow = None
api_login_flow = None
ShutdownRequested = Exception # 回退到基础异常类
# ================= 配置加载 =================
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
tomllib = None
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "config.toml"
def _load_config():
"""从 config.toml 加载配置"""
if tomllib is None or not CONFIG_FILE.exists():
return {}
try:
with open(CONFIG_FILE, "rb") as f:
return tomllib.load(f)
except Exception:
return {}
_cfg = _load_config()
_autogptplus = _cfg.get("autogptplus", {})
# ================= 核心配置区域 =================
# 从 config.toml [autogptplus] 读取,如未配置则使用默认值
# 1. 管理员 Token
MAIL_API_TOKEN = _autogptplus.get("mail_api_token", "")
# 2. 你的域名后缀(随机选择)
EMAIL_DOMAINS = _autogptplus.get("email_domains", [])
# 3. Cloud-Mail 部署地址
MAIL_API_BASE = _autogptplus.get("mail_api_base", "")
# 4. 接口路径 (邮件查询)
MAIL_API_PATH = "/api/public/emailList"
# 5. SEPA IBAN 列表 (从配置文件读取)
SEPA_IBANS = _autogptplus.get("sepa_ibans", [])
# 6. 随机指纹开关
RANDOM_FINGERPRINT = _autogptplus.get("random_fingerprint", True)
# 7. 注册模式: "browser" (浏览器自动化) 或 "api" (协议模式)
# 默认使用 API 模式(更快),如果 curl_cffi 不可用则自动回退到浏览器模式
REGISTER_MODE = _autogptplus.get("register_mode", "api")
# 8. 协议模式代理 (仅协议模式使用)
API_PROXY = _autogptplus.get("api_proxy", "")
# ================= 浏览器指纹 =================
FINGERPRINTS = [
# NVIDIA 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4080 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 3840, "height": 2160}
},
# AMD 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6800 XT Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 7900 XTX Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 3840, "height": 2160}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6700 XT Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
# Intel 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) Iris Xe Graphics Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) Arc A770 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
# 笔记本配置
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3050 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1600}
},
]
def get_random_fingerprint() -> dict:
"""随机获取一个浏览器指纹"""
return random.choice(FINGERPRINTS)
def inject_fingerprint(page, fingerprint: dict):
"""注入浏览器指纹伪装脚本"""
global _last_log_message, _last_log_time
try:
webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)")
webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)")
plat = fingerprint.get("platform", "Win32")
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
# 使用 try-catch 包裹每个属性定义,避免 Linux 上属性不可重定义的错误
js_script = f'''
(function() {{
// 伪装 WebGL 指纹
try {{
const getParameterProxyHandler = {{
apply: function(target, thisArg, args) {{
const param = args[0];
if (param === 37445) return "{webgl_vendor}";
if (param === 37446) return "{webgl_renderer}";
return Reflect.apply(target, thisArg, args);
}}
}};
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler);
if (typeof WebGL2RenderingContext !== 'undefined') {{
const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter;
WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler);
}}
}} catch(e) {{}}
// 伪装 platform
try {{ Object.defineProperty(navigator, 'platform', {{ get: () => "{plat}", configurable: true }}); }} catch(e) {{}}
// 伪装屏幕分辨率
try {{
Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]}, configurable: true }});
Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]}, configurable: true }});
}} catch(e) {{}}
// 隐藏 webdriver 特征
try {{ Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined, configurable: true }}); }} catch(e) {{}}
// 伪装 languages
try {{ Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"], configurable: true }}); }} catch(e) {{}}
// 伪装 plugins
try {{
Object.defineProperty(navigator, 'plugins', {{
get: () => [
{{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }},
{{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }},
{{ name: "Native Client", filename: "internal-nacl-plugin" }}
],
configurable: true
}});
}} catch(e) {{}}
}})();
'''
page.run_js(js_script)
# 获取浏览器语言设置
try:
browser_lang = page.run_js('return navigator.language || navigator.userLanguage || "unknown"')
except:
browser_lang = "unknown"
# 避免重复日志1秒内相同消息不重复输出
current_time = time.time()
log_msg = f"已注入: {webgl_renderer} | {screen['width']}x{screen['height']} | 语言: {browser_lang}"
if log_msg != _last_log_message or current_time - _last_log_time > 1:
log_status("指纹", log_msg)
_last_log_message = log_msg
_last_log_time = current_time
except Exception as e:
log_status("指纹", f"注入失败: {e}")
# ================= IBAN 管理函数 =================
IBAN_FILE = BASE_DIR / "sepa_ibans.txt"
def load_ibans_from_file():
"""从文件加载 IBAN 列表"""
if not IBAN_FILE.exists():
return []
try:
with open(IBAN_FILE, "r", encoding="utf-8") as f:
ibans = [line.strip() for line in f if line.strip() and line.strip().startswith("DE")]
return ibans
except Exception:
return []
def save_ibans_to_file(ibans: list):
"""保存 IBAN 列表到文件"""
try:
with open(IBAN_FILE, "w", encoding="utf-8") as f:
f.write("\n".join(ibans))
return True
except Exception:
return False
def get_sepa_ibans():
"""获取 SEPA IBAN 列表 (优先从文件读取)"""
file_ibans = load_ibans_from_file()
if file_ibans:
return file_ibans
return SEPA_IBANS
def add_sepa_ibans(new_ibans: list) -> tuple:
"""添加 IBAN 到列表
Args:
new_ibans: 新的 IBAN 列表
Returns:
tuple: (添加数量, 跳过数量, 当前总数)
"""
current = set(load_ibans_from_file())
added = 0
skipped = 0
for iban in new_ibans:
iban = iban.strip().upper()
if not iban or not iban.startswith("DE"):
continue
if iban in current:
skipped += 1
else:
current.add(iban)
added += 1
save_ibans_to_file(sorted(current))
return added, skipped, len(current)
def clear_sepa_ibans():
"""清空 IBAN 列表"""
if IBAN_FILE.exists():
IBAN_FILE.unlink()
return True
# ================= 域名管理函数 =================
DOMAIN_FILE = BASE_DIR / "email_domains.txt"
def load_domains_from_file():
"""从文件加载域名列表"""
if not DOMAIN_FILE.exists():
return []
try:
with open(DOMAIN_FILE, "r", encoding="utf-8") as f:
domains = [line.strip() for line in f if line.strip() and line.strip().startswith("@")]
return domains
except Exception:
return []
def save_domains_to_file(domains: list):
"""保存域名列表到文件"""
try:
with open(DOMAIN_FILE, "w", encoding="utf-8") as f:
f.write("\n".join(domains))
return True
except Exception:
return False
def get_email_domains():
"""获取邮箱域名列表 (合并文件和配置)"""
file_domains = set(load_domains_from_file())
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
# 合并两个来源的域名
all_domains = file_domains | config_domains
return sorted(all_domains) if all_domains else []
def validate_domain_format(domain: str) -> tuple:
"""验证域名格式是否正确
Args:
domain: 要验证的域名 (带或不带@前缀)
Returns:
tuple: (是否有效, 标准化的域名或错误信息)
"""
domain = domain.strip().lower()
# 移除开头的引号和尾部特殊字符
domain = domain.strip('"\'')
# 确保以 @ 开头
if not domain.startswith("@"):
domain = "@" + domain
# 移除尾部可能的引号或逗号
domain = domain.rstrip('",\'')
# 基本长度检查 (至少 @x.y)
if len(domain) < 4:
return False, "域名太短"
# 提取 @ 后面的部分进行验证
domain_part = domain[1:] # 去掉 @
# 检查是否包含至少一个点
if "." not in domain_part:
return False, "域名缺少点号"
# 检查点的位置 (不能在开头或结尾)
if domain_part.startswith(".") or domain_part.endswith("."):
return False, "点号位置不正确"
# 检查不能有连续的点
if ".." in domain_part:
return False, "不能有连续的点号"
# 检查每个部分是否有效
parts = domain_part.split(".")
for part in parts:
if not part:
return False, "域名部分为空"
# 检查是否只包含有效字符 (字母、数字、连字符)
if not all(c.isalnum() or c == "-" for c in part):
return False, f"域名包含无效字符"
# 不能以连字符开头或结尾
if part.startswith("-") or part.endswith("-"):
return False, "域名部分不能以连字符开头或结尾"
# 顶级域名至少2个字符
if len(parts[-1]) < 2:
return False, "顶级域名太短"
return True, domain
def add_email_domains(new_domains: list) -> tuple:
"""添加域名到列表
Args:
new_domains: 新的域名列表
Returns:
tuple: (添加数量, 跳过数量, 无效数量, 当前总数)
"""
# 获取当前所有域名(文件 + 配置)
current = set(load_domains_from_file())
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
all_existing = current | config_domains
added = 0
skipped = 0
invalid = 0
for domain in new_domains:
# 验证域名格式
is_valid, result = validate_domain_format(domain)
if not is_valid:
invalid += 1
continue
domain = result # 使用标准化后的域名
if domain in all_existing:
skipped += 1
else:
current.add(domain)
all_existing.add(domain)
added += 1
# 只保存通过 Bot 添加的域名到文件
save_domains_to_file(sorted(current))
return added, skipped, invalid, len(all_existing)
def remove_email_domain(domain: str) -> bool:
"""删除指定域名 (只能删除通过 Bot 添加的域名)
Args:
domain: 要删除的域名
Returns:
bool: 是否删除成功
"""
current = set(load_domains_from_file())
domain = domain.strip().lower()
if not domain.startswith("@"):
domain = "@" + domain
if domain in current:
current.remove(domain)
save_domains_to_file(sorted(current))
return True
return False
def get_file_domains_count() -> int:
"""获取txt文件中的域名数量 (不包含config配置的)"""
return len(load_domains_from_file())
def clear_email_domains() -> int:
"""清空域名列表 (只清空txt文件保留config配置)
Returns:
int: 被清空的域名数量
"""
count = len(load_domains_from_file())
if DOMAIN_FILE.exists():
DOMAIN_FILE.unlink()
return count
# ================= 固定配置 =================
TARGET_URL = "https://chatgpt.com"
def generate_random_birthday():
"""生成随机生日 (2000-2004年)"""
year = random.randint(2000, 2004)
month = random.randint(1, 12)
# 根据月份确定天数
if month in [1, 3, 5, 7, 8, 10, 12]:
max_day = 31
elif month in [4, 6, 9, 11]:
max_day = 30
else: # 2月
max_day = 29 if year % 4 == 0 else 28
day = random.randint(1, max_day)
return str(year), f"{month:02d}", f"{day:02d}"
def _detect_page_language(page) -> str:
"""检测页面语言
Returns:
str: 'zh' 中文, 'en' 英文
"""
try:
html = page.html
# 检查中文关键词
chinese_keywords = ['确认', '继续', '登录', '注册', '验证', '邮箱', '密码', '姓名', '生日']
for keyword in chinese_keywords:
if keyword in html:
return 'zh'
return 'en'
except Exception:
return 'en' # 默认英文
def _input_birthday_precise(page, birth_year: str, birth_month: str, birth_day: str) -> bool:
"""精确输入生日 (使用 data-type 选择器定位输入框)
中文界面: yyyy/mm/dd (年/月/日)
英文界面: mm/dd/yyyy (月/日/年)
Args:
page: 浏览器页面对象
birth_year: 年份 (如 "2000")
birth_month: 月份 (如 "07")
birth_day: 日期 (如 "15")
Returns:
bool: 是否成功
"""
try:
# 使用 data-type 属性精确定位三个输入框
year_input = page.ele('css:[data-type="year"]', timeout=5)
month_input = page.ele('css:[data-type="month"]', timeout=3)
day_input = page.ele('css:[data-type="day"]', timeout=3)
if not all([year_input, month_input, day_input]):
log_progress("[!] 未找到完整的生日输入框 (data-type),尝试备用方案...")
return False
# 检测页面语言
lang = _detect_page_language(page)
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (界面: {lang})")
# 根据语言决定输入顺序
if lang == 'zh':
# 中文: 年 -> 月 -> 日
input_order = [(year_input, birth_year),
(month_input, birth_month),
(day_input, birth_day)]
else:
# 英文: 月 -> 日 -> 年
input_order = [(month_input, birth_month),
(day_input, birth_day),
(year_input, birth_year)]
# 按顺序输入
for input_elem, value in input_order:
try:
input_elem.click()
time.sleep(0.15)
input_elem.input(value, clear=True)
time.sleep(0.2)
except Exception as e:
log_progress(f"[!] 生日字段输入异常: {e}")
return False
log_progress("[OK] 生日已输入 (精确模式)")
return True
except Exception as e:
log_progress(f"[!] 精确生日输入失败: {e}")
return False
def _input_birthday_fallback(page, birth_year: str, birth_month: str, birth_day: str):
"""备用生日输入方式 (Tab 键切换 + 逐字符输入)
Args:
page: 浏览器页面对象
birth_year: 年份
birth_month: 月份
birth_day: 日期
"""
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (备用模式)")
# 使用 Tab 键切换到生日字段
page.actions.key_down('Tab').key_up('Tab')
time.sleep(0.3)
# 逐字符输入
birth_str = birth_year + birth_month + birth_day
for digit in birth_str:
page.actions.type(digit)
time.sleep(0.1)
log_progress("[OK] 生日已输入 (备用模式)")
# 地址格式: (街道, 邮编, 城市)
SEPA_ADDRESSES = [
# 柏林
("Alexanderplatz 1", "10178", "Berlin"),
("Unter den Linden 77", "10117", "Berlin"),
("Kurfürstendamm 21", "10719", "Berlin"),
("Friedrichstraße 43", "10117", "Berlin"),
("Potsdamer Platz 1", "10785", "Berlin"),
("Tauentzienstraße 9", "10789", "Berlin"),
# 慕尼黑
("Marienplatz 8", "80331", "München"),
("Leopoldstraße 32", "80802", "München"),
("Maximilianstraße 17", "80539", "München"),
("Kaufingerstraße 28", "80331", "München"),
("Sendlinger Straße 3", "80331", "München"),
# 汉堡
("Mönckebergstraße 16", "20095", "Hamburg"),
("Jungfernstieg 38", "20354", "Hamburg"),
("Spitalerstraße 12", "20095", "Hamburg"),
("Neuer Wall 50", "20354", "Hamburg"),
# 法兰克福
("Zeil 106", "60313", "Frankfurt am Main"),
("Kaiserstraße 62", "60329", "Frankfurt am Main"),
("Goethestraße 1", "60313", "Frankfurt am Main"),
("Große Bockenheimer Str. 2", "60313", "Frankfurt am Main"),
# 科隆
("Hohe Straße 111", "50667", "Köln"),
("Schildergasse 24", "50667", "Köln"),
("Breite Straße 80", "50667", "Köln"),
# 斯图加特
("Königstraße 2", "70173", "Stuttgart"),
("Calwer Straße 19", "70173", "Stuttgart"),
("Schulstraße 5", "70173", "Stuttgart"),
# 杜塞尔多夫
("Königsallee 60", "40212", "Düsseldorf"),
("Schadowstraße 11", "40212", "Düsseldorf"),
("Flinger Straße 36", "40213", "Düsseldorf"),
# 莱比锡
("Grimmaische Straße 25", "04109", "Leipzig"),
("Petersstraße 36", "04109", "Leipzig"),
# 德累斯顿
("Prager Straße 12", "01069", "Dresden"),
("Altmarkt 10", "01067", "Dresden"),
# 纽伦堡
("Karolinenstraße 12", "90402", "Nürnberg"),
("Breite Gasse 23", "90402", "Nürnberg"),
# 汉诺威
("Georgstraße 10", "30159", "Hannover"),
("Bahnhofstraße 5", "30159", "Hannover"),
# 不来梅
("Obernstraße 2", "28195", "Bremen"),
("Sögestraße 18", "28195", "Bremen"),
]
FIRST_NAMES = [
# 德国常见男性名
"Lukas", "Leon", "Maximilian", "Felix", "Paul", "Jonas", "Tim", "David",
"Niklas", "Jan", "Philipp", "Moritz", "Alexander", "Sebastian", "Florian",
"Julian", "Tobias", "Simon", "Daniel", "Christian", "Markus", "Thomas",
"Michael", "Stefan", "Andreas", "Martin", "Matthias", "Benjamin", "Patrick",
# 德国常见女性名
"Anna", "Laura", "Julia", "Lena", "Sarah", "Lisa", "Marie", "Sophie",
"Katharina", "Hannah", "Emma", "Mia", "Lea", "Johanna", "Clara",
"Charlotte", "Emilia", "Luisa", "Nina", "Elena", "Melanie", "Christina",
"Sandra", "Nicole", "Sabine", "Claudia", "Petra", "Monika", "Stefanie",
]
LAST_NAMES = [
# 德国最常见姓氏
"Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner",
"Becker", "Schulz", "Hoffmann", "Schäfer", "Koch", "Bauer", "Richter",
"Klein", "Wolf", "Schröder", "Neumann", "Schwarz", "Zimmermann", "Braun",
"Krüger", "Hofmann", "Hartmann", "Lange", "Schmitt", "Werner", "Schmitz",
"Krause", "Meier", "Lehmann", "Schmid", "Schulze", "Maier", "Köhler",
"Herrmann", "König", "Walter", "Mayer", "Huber", "Kaiser", "Fuchs",
"Peters", "Lang", "Scholz", "Möller", "Weiß", "Jung", "Hahn", "Schubert",
]
# ================= 工具函数 =================
# 日志去重缓存
_last_log_message = ""
_last_log_time = 0
def cleanup_chrome_processes():
"""清理残留的 Chrome 进程 (跨平台支持)"""
global _last_log_message, _last_log_time
try:
if platform.system() == "Windows":
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
try:
result = subprocess.run(
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
pid = line.strip()
if pid.isdigit():
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
except Exception:
pass
else:
# Linux/Mac: 使用 pkill
try:
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程
try:
subprocess.run(
['pkill', '-f', 'chrome.*--headless'],
capture_output=True, timeout=5
)
except Exception:
pass
# 避免重复日志1秒内相同消息不重复输出
current_time = time.time()
log_msg = "已清理 Chrome 残留进程"
if log_msg != _last_log_message or current_time - _last_log_time > 1:
log_status("清理", log_msg)
_last_log_message = log_msg
_last_log_time = current_time
except Exception:
pass # 静默处理,不影响主流程
def log_status(step, message):
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] [{step}] {message}")
sys.stdout.flush()
def log_progress(message):
print(f" {message}")
sys.stdout.flush()
def save_account(email, password, token, account_id=""):
"""保存账号信息到 JSON 文件"""
import json
accounts_file = "accounts.json"
# 读取现有账号
accounts = []
try:
with open(accounts_file, 'r', encoding='utf-8') as f:
accounts = json.load(f)
except:
pass
# 添加新账号
account_data = {
"account": email,
"password": password,
"token": token
}
if account_id:
account_data["account_id"] = account_id
accounts.append(account_data)
# 保存
with open(accounts_file, 'w', encoding='utf-8') as f:
json.dump(accounts, f, ensure_ascii=False, indent=2)
log_status("保存", f"账号已保存到 {accounts_file}")
def get_verification_content(target_email, max_retries=90):
log_status("API监听", f"正在监听邮件 ({MAIL_API_PATH})...")
headers = {
"Authorization": MAIL_API_TOKEN,
"Content-Type": "application/json"
}
start_time = time.time()
for i in range(max_retries):
elapsed = int(time.time() - start_time)
try:
url = f"{MAIL_API_BASE}{MAIL_API_PATH}"
payload = {
"toEmail": target_email,
"timeSort": "desc",
"size": 20
}
resp = requests.post(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:
mails = data.get('data', [])
if mails:
for mail in mails:
log_status("捕获", "✅ 成功抓取到目标邮件!")
html_body = mail.get('content') or mail.get('text') or str(mail)
# 提取验证码
code_match = re.search(r'\b(\d{6})\b', html_body)
if code_match:
code = code_match.group(1)
log_status("解析", f"提取到验证码: {code}")
return {"type": "code", "val": code}
# 提取链接
link_match = re.search(r'href="(https://.*openai\.com/.*verification.*)"', html_body)
if not link_match:
link_match = re.search(r'href="(https://auth0\.openai\.com/u/login/identifier\?state=[^"]+)"', html_body)
if link_match:
link = link_match.group(1)
log_status("解析", f"提取到链接: {link[:30]}...")
return {"type": "link", "val": link}
except:
pass
if i % 5 == 0:
print(f" [监听中] 已耗时 {elapsed}秒...")
sys.stdout.flush()
time.sleep(2)
log_status("超时", "[X] 未能获取验证码。")
return None
def _is_shutdown_requested():
"""检查是否收到停止请求"""
try:
import run
return run._shutdown_requested
except Exception:
return False
def _is_connection_lost(error_msg):
"""检查是否是连接断开错误(通常由 /stop 命令导致)"""
disconnect_keywords = [
"connection to the page has been disconnected",
"page has been closed",
"target closed",
"session closed",
"browser has disconnected",
"no such window",
"invalid session id"
]
error_lower = str(error_msg).lower()
return any(kw in error_lower for kw in disconnect_keywords)
def run_payment_flow(page, email, step_callback=None):
"""执行 SEPA 支付流程 - 严格按顺序执行,任一步骤失败则终止
Args:
page: 浏览器页面对象
email: 邮箱地址
step_callback: 步骤回调函数 (step: str)
Returns:
dict: 成功时返回 {"token": ..., "account_id": ...}
被停止时返回 {"stopped": True}
失败时返回 None
"""
def step_cb(step):
if step_callback:
step_callback(step)
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求,中断支付流程")
return {"stopped": True}
log_status("支付流程", "开始处理 Stripe 支付页...")
try:
# 等待支付页加载完成
step_cb("等待支付页加载...")
log_status("支付页", "等待支付页加载...")
# 等待页面稳定Stripe 支付页需要时间渲染)
time.sleep(3)
# 尝试多种方式定位邮箱输入框
email_input = None
for attempt in range(3):
# 方式1: 直接 ID
email_input = page.ele('#email', timeout=5)
if email_input:
break
# 方式2: name 属性
email_input = page.ele('@name=email', timeout=3)
if email_input:
break
# 方式3: CSS 选择器
email_input = page.ele('css:input[type="email"], input[id*="email"], input[name*="email"]', timeout=3)
if email_input:
break
log_progress(f"[尝试{attempt+1}] 等待邮箱输入框...")
time.sleep(2)
if email_input:
log_progress("[OK] 支付页已加载")
else:
log_progress("[!] 邮箱输入框未立即出现,继续尝试...")
time.sleep(3)
# 随机选择 IBAN 和地址
ibans = get_sepa_ibans()
if not ibans:
log_progress("[X] 没有可用的 IBAN请先通过 Bot 导入")
return None
sepa_iban = random.choice(ibans)
street, postal_code, city = random.choice(SEPA_ADDRESSES)
account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
log_progress(f"使用 IBAN: {sepa_iban[:8]}...")
log_progress(f"使用地址: {street}, {postal_code} {city}")
log_progress(f"账户名: {account_name}")
# ========== 步骤 1: 填写邮箱 ==========
step_cb("填写支付邮箱...")
log_progress("[1] 填写邮箱...")
try:
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求")
return {"stopped": True}
# 如果之前已经找到了,直接使用;否则重新查找
if not email_input:
email_input = page.ele('#email', timeout=10)
if not email_input:
email_input = page.ele('@name=email', timeout=5)
if not email_input:
email_input = page.ele('css:input[type="email"]', timeout=5)
if not email_input:
log_progress("[X] 邮箱输入框未找到")
log_progress(f"当前URL: {page.url}")
return None
email_input.clear()
email_input.input(email)
log_progress(f"[OK] 已填写邮箱: {email}")
time.sleep(1)
except Exception as e:
error_msg = str(e)
if _is_connection_lost(error_msg):
log_progress("[!] 浏览器连接断开")
return {"stopped": True}
log_progress(f"[X] 邮箱填写失败: {e}")
log_progress(f"当前URL: {page.url}")
return None
# ========== 步骤 2: 选择 SEPA ==========
step_cb("选择 SEPA 支付方式...")
log_progress("[2] 选择 SEPA...")
time.sleep(1)
sepa_clicked = False
# 定位方式(按速度排序:属性选择器 > CSS > xpath
sepa_selectors = [
'@data-testid=sepa_debit-accordion-item-button', # 最快:属性选择器
'css:button[data-testid*="sepa"]', # 快CSS 模糊匹配
]
for selector in sepa_selectors:
try:
sepa_btn = page.ele(selector, timeout=2)
if sepa_btn:
page.run_js('arguments[0].click()', sepa_btn)
sepa_clicked = True
time.sleep(1)
break
except:
continue
if not sepa_clicked:
# 最后尝试 JS
try:
result = page.run_js('''
const btns = document.querySelectorAll('button');
for(let btn of btns) {
if(btn.innerText.includes('SEPA')) {
btn.click();
return true;
}
}
return false;
''')
if result:
sepa_clicked = True
time.sleep(1)
except:
pass
if not sepa_clicked:
log_progress("[X] SEPA 选择失败")
return None
# 验证 SEPA 是否真正展开(检查 IBAN 输入框是否出现)
try:
iban_check = page.ele('#iban', timeout=5)
if not iban_check:
log_progress("[X] SEPA 未展开")
return None
except:
log_progress("[X] SEPA 未展开")
return None
# ========== 步骤 3: 填写 IBAN ==========
step_cb("填写 IBAN...")
log_progress("[3] 填写 IBAN...")
try:
iban_input = page.ele('#iban', timeout=5)
if not iban_input:
log_progress("[X] IBAN 输入框未找到")
return None
iban_input.input(sepa_iban)
except Exception as e:
log_progress(f"[X] IBAN 填写失败: {e}")
return None
# ========== 步骤 4: 填写账户姓名 ==========
step_cb("填写账户姓名...")
log_progress("[4] 填写姓名...")
try:
name_input = page.ele('#billingName', timeout=3)
if not name_input:
name_input = page.ele('@name=billingName', timeout=2)
if name_input:
name_input.input(account_name)
except:
pass
# ========== 步骤 5: 填写地址 ==========
step_cb("填写账单地址...")
log_progress("[5] 填写地址...")
try:
addr_input = page.ele('#billingAddressLine1', timeout=1)
if not addr_input:
try:
manual_btn = page.ele('@data-testid=manual-address-entry', timeout=1)
if manual_btn:
manual_btn.click()
time.sleep(0.3)
except:
pass
addr_input = page.ele('#billingAddressLine1', timeout=2)
if addr_input:
addr_input.input(street)
postal_input = page.ele('#billingPostalCode', timeout=1)
if postal_input:
postal_input.input(postal_code)
city_input = page.ele('#billingLocality', timeout=1)
if city_input:
city_input.input(city)
page.actions.key_down('Escape').key_up('Escape')
except:
pass
# ========== 步骤 6: 勾选条款 ==========
step_cb("勾选服务条款...")
log_progress("[6] 勾选条款...")
try:
terms_checkbox = page.ele('#termsOfServiceConsentCheckbox', timeout=3)
if terms_checkbox:
terms_checkbox.click()
except:
pass
# ========== 步骤 7: 点击订阅 ==========
step_cb("提交订阅...")
log_progress("[7] 点击订阅...")
time.sleep(1)
# 多种方式尝试点击订阅按钮Linux 无头模式下需要更可靠的方式)
subscribe_clicked = False
# 方式1: 使用 DrissionPage 的 click
try:
subscribe_btn = page.ele('css:button[type="submit"]', timeout=5)
if subscribe_btn:
# 确保按钮可见并滚动到视图
page.run_js('arguments[0].scrollIntoView({block: "center"})', subscribe_btn)
time.sleep(0.5)
subscribe_btn.click()
subscribe_clicked = True
log_progress("[OK] 点击订阅按钮 (方式1)")
except Exception as e:
log_progress(f"[!] 方式1点击失败: {e}")
# 方式2: 使用 JS 点击
if not subscribe_clicked:
try:
result = page.run_js('''
const btn = document.querySelector('button[type="submit"]');
if (btn) {
btn.scrollIntoView({block: "center"});
btn.click();
return "clicked";
}
return "not found";
''')
if result == "clicked":
subscribe_clicked = True
log_progress("[OK] 点击订阅按钮 (方式2-JS)")
else:
log_progress(f"[!] 方式2: {result}")
except Exception as e:
log_progress(f"[!] 方式2点击失败: {e}")
# 方式3: 使用 JS dispatchEvent 模拟点击
if not subscribe_clicked:
try:
result = page.run_js('''
const btn = document.querySelector('button[type="submit"]');
if (btn) {
const event = new MouseEvent('click', {
bubbles: true,
cancelable: true,
view: window
});
btn.dispatchEvent(event);
return "dispatched";
}
return "not found";
''')
if result == "dispatched":
subscribe_clicked = True
log_progress("[OK] 点击订阅按钮 (方式3-dispatchEvent)")
except Exception as e:
log_progress(f"[!] 方式3点击失败: {e}")
# 记录当前 URL 用于调试
try:
log_progress(f"点击后URL: {page.url}")
except:
pass
# ========== 步骤 8: 等待支付成功 ==========
step_cb("等待支付处理...")
log_status("等待", "等待支付处理(超时90秒)...")
# 使用轮询方式等待支付成功
payment_success = False
start_time = time.time()
max_wait = 90 # 最多等待 90 秒
last_log_time = 0
last_url = ""
while time.time() - start_time < max_wait:
try:
current_url = page.url
# URL 变化时输出日志
if current_url != last_url:
log_progress(f"[URL变化] {current_url[:80]}...")
last_url = current_url
# 检查是否支付成功
if 'payments/success' in current_url or 'success-team' in current_url:
payment_success = True
log_status("成功", "[OK] 支付成功!")
break
# 检查是否有错误页面
if 'error' in current_url.lower() or 'failed' in current_url.lower():
log_status("失败", f"[X] 支付失败: {current_url}")
return None
# 检查页面上是否有错误提示Stripe 可能显示错误但不改变 URL
try:
error_elem = page.ele('css:[class*="error"], [class*="Error"], [role="alert"]', timeout=0.5)
if error_elem and error_elem.text:
error_text = error_elem.text[:100]
if error_text and 'error' in error_text.lower():
log_progress(f"[!] 页面错误: {error_text}")
except:
pass
except Exception as e:
# 页面加载中可能会抛出异常,记录但继续等待
log_progress(f"[!] 页面访问异常: {str(e)[:50]}")
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求")
return {"stopped": True}
# 每 10 秒输出一次日志
elapsed = int(time.time() - start_time)
if elapsed >= last_log_time + 10:
log_progress(f"[等待中] 已等待 {elapsed} 秒...")
last_log_time = elapsed
# 每 30 秒检查一次页面状态
if elapsed % 30 == 0:
try:
# 检查是否还在支付页
if 'pay.openai.com' in current_url or 'checkout.stripe.com' in current_url:
log_progress("[!] 仍在支付页,可能卡住了")
# 尝试再次点击提交按钮
try:
page.run_js('document.querySelector("button[type=submit]")?.click()')
log_progress("[!] 尝试重新点击提交按钮")
except:
pass
except:
pass
time.sleep(2)
if not payment_success:
log_status("超时", "[X] 支付未完成(超时)")
try:
log_progress(f"最终URL: {page.url}")
# 尝试获取页面截图信息(调试用)
try:
page_title = page.run_js('return document.title')
log_progress(f"页面标题: {page_title}")
except:
pass
except:
pass
return None
# ========== 步骤 9: 获取 token 和 account_id ==========
step_cb("获取 Token...")
log_status("获取", "正在获取 access token...")
time.sleep(1)
page.get("https://chatgpt.com/api/auth/session")
time.sleep(1)
try:
# 获取页面内容JSON
session_text = page.ele('tag:pre', timeout=5).text
import json
session_data = json.loads(session_text)
access_token = session_data.get('accessToken', '')
if access_token:
log_status("成功", f"Token: {access_token[:50]}...")
# 优先从 session 数据直接提取 account_id最快
step_cb("获取 Account ID...")
account_id = fetch_account_id_from_session(session_data)
# 如果 session 中没有,再通过 API 获取
if not account_id:
account_id = fetch_account_id(page, access_token)
if account_id:
log_progress(f"account_id: {account_id}")
return {
"token": access_token,
"account_id": account_id
}
else:
log_progress("[X] 未找到 accessToken")
return None
except Exception as e:
log_progress(f"[X] 获取 token 失败: {e}")
return None
except Exception as e:
error_msg = str(e)
# 只有连接断开才认为是停止请求,普通异常按错误处理
if _is_connection_lost(error_msg):
log_status("停止", "[!] 浏览器连接断开,支付流程已中断")
return {"stopped": True}
log_status("错误", f"[X] 支付流程异常: {e}")
return None
def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool = True, step_callback=None):
"""使用 API 会话的 cookies 注入浏览器完成支付 (协议模式专用)
Args:
reg: ChatGPTAPIRegister 对象
email: 邮箱
proxy: 代理地址
headless: 是否无头模式
step_callback: 步骤回调
Returns:
dict: {"token": ..., "account_id": ...} 或 None
"""
def step_cb(step):
if step_callback:
step_callback(step)
step_cb("获取支付页 URL...")
# 通过 API 获取支付页 URL
checkout_url = reg.get_checkout_url()
if not checkout_url:
log_status("失败", "无法获取支付页 URL")
return None
log_progress(f"[OK] 支付页: {checkout_url[:60]}...")
# 获取 cookies
cookies = reg.get_cookies()
log_status("Cookie", f"获取到 {len(cookies)} 个 cookies")
# 启动浏览器
step_cb("启动浏览器...")
temp_user_data = tempfile.mkdtemp(prefix="chrome_api_")
# 检测操作系统
is_linux = platform.system() == "Linux"
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--no-default-browser-check')
co.set_argument(f'--user-data-dir={temp_user_data}')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--disable-infobars')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--no-sandbox')
co.set_argument('--disable-gpu') # Linux 无头模式必需
co.set_argument('--incognito') # 无痕模式
co.set_argument('--lang=en-US') # 设置语言为英文Stripe 页面)
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
if headless:
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
else:
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
if proxy:
co.set_argument(f'--proxy-server={proxy}')
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
# 注意:不使用 --single-process可能导致 Stripe 页面异常
co.set_argument('--remote-debugging-port=0')
co.set_argument('--disable-background-networking')
co.set_argument('--disable-default-apps')
co.set_argument('--disable-sync')
co.set_argument('--metrics-recording-only')
co.set_argument('--mute-audio')
co.set_argument('--no-zygote') # 替代 single-process更稳定
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log_status("浏览器", f"使用: {chrome_path}")
break
else:
co.auto_port(True)
co.set_local_port(random.randint(19222, 29999))
log_status("浏览器", f"正在启动 ({'无头' if headless else '有头'}模式)...")
page = ChromiumPage(co)
try:
# 注入指纹
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
# 先访问 chatgpt.com 注入 cookies
step_cb("注入登录状态...")
log_status("Cookie", "注入登录状态...")
page.get("https://chatgpt.com")
injected_count = 0
for cookie in cookies:
try:
if 'chatgpt.com' in cookie.get('domain', ''):
page.set.cookies({
'name': cookie['name'],
'value': cookie['value'],
'domain': cookie['domain'].lstrip('.'),
'path': cookie.get('path', '/'),
})
injected_count += 1
except:
pass
log_progress(f"[OK] 已注入 {injected_count} 个 cookies")
# 刷新页面确保 cookies 生效
time.sleep(1)
page.refresh()
time.sleep(1)
# 直接跳转到支付页
step_cb("跳转到支付页...")
log_status("订阅", "跳转到支付页...")
page.get(checkout_url)
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("[OK] 已跳转到支付页")
except:
time.sleep(2)
# 重新注入指纹(跳转到新域名后需要重新注入)
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
log_progress("[OK] 已重新注入指纹到支付页")
# 等待页面完全加载
time.sleep(2)
# 执行支付流程
step_cb("执行 SEPA 支付...")
result = run_payment_flow(page, email, step_cb)
return result
except Exception as e:
log_status("错误", f"浏览器流程异常: {e}")
return None
finally:
try:
page.quit()
except:
pass
# 清理临时目录
try:
import shutil
shutil.rmtree(temp_user_data, ignore_errors=True)
except:
pass
def fetch_account_id(page, access_token: str) -> str:
"""通过 API 获取 account_id (使用 requests 直接请求,更快)"""
log_status("获取", "正在获取 account_id...")
try:
# 直接使用 requests 请求 API比 page.get() + JS fetch 快得多
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
resp = requests.get(
"https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27",
headers=headers,
timeout=10
)
if resp.status_code == 200:
data = resp.json()
accounts = data.get("accounts", {})
# 优先查找 Team 账户
for acc_id, acc_info in accounts.items():
if acc_id == "default":
continue
account_data = acc_info.get("account", {})
plan_type = account_data.get("plan_type", "")
if "team" in plan_type.lower():
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
return acc_id
# 取第一个非 default 的
for acc_id in accounts.keys():
if acc_id != "default":
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
return acc_id
else:
log_progress(f"API 请求失败: {resp.status_code}")
except Exception as e:
log_progress(f"获取 account_id 失败: {e}")
return ""
def fetch_account_id_from_session(session_data: dict) -> str:
"""直接从 session 数据中提取 account_id (最快方式)"""
try:
account = session_data.get("account", {})
account_id = account.get("id", "")
if account_id:
log_status("成功", f"获取到 account_id: {account_id[:8]}...")
return account_id
except Exception as e:
log_progress(f"从 session 提取 account_id 失败: {e}")
return ""
def run_main_process():
# 检查必要配置
if not MAIL_API_TOKEN or not MAIL_API_BASE or not EMAIL_DOMAINS:
print("\n" + "="*60)
print("[X] 配置错误: 请在 config.toml 中配置 [autogptplus] 段")
print(" - mail_api_token: Cloud Mail API Token")
print(" - mail_api_base: Cloud Mail API 地址")
print(" - email_domains: 可用邮箱域名列表")
print("="*60 + "\n")
return
# 清理可能残留的 Chrome 调试进程
cleanup_chrome_processes()
# === 1. 生成 15 位随机账号 + 同密码 ===
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
domains = get_email_domains()
if not domains:
print("\n" + "="*60)
print("[X] 配置错误: 没有可用的邮箱域名")
print(" 请在 config.toml 中配置 email_domains 或通过 Bot 添加")
print("="*60 + "\n")
return
email_domain = random.choice(domains)
email = f"{random_str}{email_domain}"
# 生成符合要求的密码:大小写字母+数字+特殊字符至少12位
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
''.join(random.choices(string.ascii_lowercase, k=8)) + \
''.join(random.choices(string.digits, k=2)) + \
random.choice('!@#$%')
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
print("\n" + "="*60)
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
print("="*60 + "\n")
# 检测操作系统
is_linux = platform.system() == "Linux"
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log_status("指纹", f"{fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}")
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
log_status("指纹", "使用默认指纹 (RTX 3060)")
# 配置 DrissionPage - 与项目 browser_automation.py 保持一致
co = ChromiumOptions()
co.set_argument('--no-first-run') # 跳过首次运行
co.set_argument('--disable-infobars')
co.set_argument('--incognito') # 无痕模式
co.set_argument('--disable-gpu') # 减少资源占用
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
co.set_argument('--no-sandbox') # 服务器环境需要
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
co.set_argument('--lang=zh-CN') # 设置语言为中文简体
co.set_argument(f'--user-agent={fingerprint["user_agent"]}') # 设置 User-Agent
# Linux 服务器特殊配置
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
co.set_argument('--single-process') # 某些 Linux 环境需要
co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口
# 尝试查找 Chrome/Chromium 路径
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log_status("浏览器", f"使用浏览器: {chrome_path}")
break
else:
co.auto_port(True) # Windows 使用自动分配端口
co.set_local_port(random.randint(19222, 29999)) # 备用:手动设置随机端口
# 无头模式 (服务器运行)
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
log_status("浏览器", f"正在启动浏览器 (无头模式, {'Linux' if is_linux else 'Windows'})...")
try:
page = ChromiumPage(co)
# 注入指纹伪装
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
except Exception as e:
log_status("浏览器", f"首次启动失败: {e},尝试清理后重试...")
# 清理残留进程后重试
cleanup_chrome_processes()
time.sleep(2)
# 重新配置
co2 = ChromiumOptions()
co2.set_argument('--no-first-run')
co2.set_argument('--disable-infobars')
co2.set_argument('--incognito')
co2.set_argument('--disable-gpu')
co2.set_argument('--disable-dev-shm-usage')
co2.set_argument('--no-sandbox')
co2.set_argument('--disable-blink-features=AutomationControlled')
co2.set_argument('--lang=zh-CN')
co2.set_argument(f'--user-agent={fingerprint["user_agent"]}')
co2.set_argument('--headless=new')
co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
if is_linux:
co2.set_argument('--disable-software-rasterizer')
co2.set_argument('--disable-extensions')
co2.set_argument('--disable-setuid-sandbox')
co2.set_argument('--single-process')
co2.set_argument('--remote-debugging-port=0')
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co2.set_browser_path(chrome_path)
break
else:
co2.set_local_port(random.randint(30000, 39999))
page = ChromiumPage(co2)
# 注入指纹伪装
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
try:
# === 注册流程 ===
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
page.get(TARGET_URL)
log_progress(f"当前URL: {page.url}")
# 使用 data-testid 定位登录按钮
login_btn = page.ele('@data-testid=login-button', timeout=30)
if not login_btn:
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
login_btn.click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
log_progress("填入邮箱...")
email_input = page.ele('@name=email', timeout=30)
email_input.input(email)
page.ele('xpath://button[@type="submit"]').click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
log_progress("填入密码...")
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
password_input.input(password)
page.ele('xpath://button[@type="submit"]').click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
log_status("步骤 2/5", "等待邮件 (All Mail)...")
# === 接码与验证 ===
verify_data = get_verification_content(email)
if verify_data:
log_status("步骤 3/5", "执行验证...")
if verify_data['type'] == 'link':
page.new_tab(verify_data['val'])
time.sleep(5)
log_progress(f"当前URL: {page.url}")
elif verify_data['type'] == 'code':
code = verify_data['val']
log_progress(f"填入验证码: {code}")
try:
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
code_input.input(code)
# 点击继续(使用 type=submit
time.sleep(1)
try:
log_progress("尝试点击继续按钮...")
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
except:
log_progress("未找到按钮或已自动跳转...")
time.sleep(2)
log_progress(f"当前URL: {page.url}")
except Exception as e:
log_progress(f"验证码填入异常: {e}")
# === 资料填写 (姓名+生日) ===
log_status("步骤 4/5", "进入信息填写页...")
log_progress(f"当前URL: {page.url}")
try:
name_input = page.ele('@name=name', timeout=20)
name_input.input(real_name)
log_progress(f"姓名: {real_name}")
# 生成随机生日
birth_year, birth_month, birth_day = generate_random_birthday()
# 优先使用精确输入,失败则使用备用方案
if not _input_birthday_precise(page, birth_year, birth_month, birth_day):
_input_birthday_fallback(page, birth_year, birth_month, birth_day)
time.sleep(1.5)
# 点击继续/完成注册(使用 type=submit
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
final_reg_btn.click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
# === 检测提交后是否有错误或需要额外操作 ===
submit_success = False
for check_attempt in range(10):
current_url = page.url
# 检查是否已跳转出 about-you 页面
if 'about-you' not in current_url:
log_progress(f"[OK] 页面已跳转: {current_url[:50]}...")
submit_success = True
break
# 检查是否有错误提示
try:
error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1)
if error_elem and error_elem.text:
log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}")
except:
pass
# 检查是否有未勾选的 checkbox如服务条款
try:
unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1)
if unchecked:
log_progress("发现未勾选的选项,尝试勾选...")
unchecked.click()
time.sleep(0.5)
# 重新点击提交
final_reg_btn = page.ele('css:button[type="submit"]', timeout=5)
if final_reg_btn:
final_reg_btn.click()
time.sleep(2)
except:
pass
# 检查是否有 CAPTCHA
try:
captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1)
if captcha:
log_progress("[!] 检测到 CAPTCHA需要人工处理或等待...")
except:
pass
time.sleep(1)
if not submit_success:
log_progress(f"[!] 提交后仍在 about-you 页面当前URL: {page.url}")
# =======================================================
# 【关键节点】等待进入主页后再执行 JS 跳转到支付页
# =======================================================
log_status("订阅", "等待进入 ChatGPT 主页...")
# 快速检测循环(减少等待时间)
entered_main = False
for i in range(20): # 最多等待 10 秒
current_url = page.url
# 更宽松的判断:只要不在 auth/login 页面就认为进入了主页
if 'chatgpt.com' in current_url:
if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url:
log_progress(f"[OK] 已进入主页: {current_url}")
entered_main = True
break
time.sleep(0.5) # 减少等待间隔
if not entered_main:
log_progress("[!] 等待主页超时,尝试继续...")
log_progress(f"当前URL: {page.url}")
# 尝试直接访问主页
page.get("https://chatgpt.com/")
time.sleep(2)
log_progress(f"跳转后URL: {page.url}")
log_status("订阅", "执行 JS 跳转到支付页...")
# 直接执行 JS 跳转到支付页无需额外等待JS 会自动获取 session
checkout_js = '''
(async function(){
try {
const t = await(await fetch("/api/auth/session")).json();
if(!t.accessToken){
return "请先登录ChatGPT";
}
const p = {
plan_name: "chatgptteamplan",
team_plan_data: {
workspace_name: "Sepa",
price_interval: "month",
seat_quantity: 5
},
billing_details: {
country: "DE",
currency: "EUR"
},
promo_campaign: {
promo_campaign_id: "team-1-month-free",
is_coupon_from_query_param: true
},
checkout_ui_mode: "redirect"
};
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
method: "POST",
headers: {
Authorization: "Bearer " + t.accessToken,
"Content-Type": "application/json"
},
body: JSON.stringify(p)
});
const d = await r.json();
if(d.url){
window.location.href = d.url;
return "success";
} else {
return "提取失败:" + (d.detail || JSON.stringify(d));
}
} catch(e) {
return "发生错误:" + e;
}
})();
'''
result = page.run_js(checkout_js)
log_progress(f"JS 执行结果: {result}")
# 等待跳转到支付页(使用 URL 检测代替固定等待)
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("[OK] 已跳转到支付页")
log_progress(f"支付页URL: {page.url}")
except:
time.sleep(1)
log_progress(f"当前URL: {page.url}")
# 执行支付流程
result = run_payment_flow(page, email)
# 保存账号信息
if result and result.get("token"):
save_account(
email,
password,
result["token"],
result.get("account_id", "")
)
except Exception as e:
log_status("崩溃", f"注册/订阅转换阶段异常: {e}")
else:
log_status("失败", "未能找到验证码。")
except Exception as e:
log_status("崩溃", f"全局错误: {e}")
print("\n" + "="*60)
input("按回车键退出...")
page.quit()
def run_single_registration(progress_callback=None, step_callback=None) -> dict:
"""执行单次注册流程 (供 Bot 调用)
Args:
progress_callback: 进度回调函数 (message: str) - 用于日志
step_callback: 步骤回调函数 (step: str) - 用于更新 Bot 显示的当前步骤
Returns:
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
"""
def log_cb(msg):
if progress_callback:
progress_callback(msg)
else:
log_progress(msg)
def step_cb(step):
if step_callback:
step_callback(step)
# 检查必要配置
if not MAIL_API_TOKEN or not MAIL_API_BASE:
return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"}
# 检查域名
domains = get_email_domains()
if not domains:
return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"}
# 检查 IBAN
ibans = get_sepa_ibans()
if not ibans:
return {"success": False, "error": "没有可用的 IBAN请先通过 /iban_add 导入"}
step_cb("生成账号信息...")
# 生成账号信息
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
email_domain = random.choice(domains)
email = f"{random_str}{email_domain}"
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
''.join(random.choices(string.ascii_lowercase, k=8)) + \
''.join(random.choices(string.digits, k=2)) + \
random.choice('!@#$%')
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
# 检测操作系统
is_linux = platform.system() == "Linux"
step_cb("启动浏览器...")
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log_status("指纹", f"已注入: {fingerprint['webgl_renderer'][:40]}...")
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
# 配置浏览器
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--disable-infobars')
co.set_argument('--incognito')
co.set_argument('--disable-gpu')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--no-sandbox')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--lang=zh-CN')
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
co.set_argument('--single-process')
co.set_argument('--remote-debugging-port=0')
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
break
else:
co.auto_port(True)
co.set_local_port(random.randint(19222, 29999))
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
page = None
try:
page = ChromiumPage(co)
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
except Exception as e:
log_status("浏览器", f"启动失败: {e},尝试清理后重试...")
cleanup_chrome_processes()
time.sleep(2)
# 重新配置
co2 = ChromiumOptions()
co2.set_argument('--no-first-run')
co2.set_argument('--disable-infobars')
co2.set_argument('--incognito')
co2.set_argument('--disable-gpu')
co2.set_argument('--disable-dev-shm-usage')
co2.set_argument('--no-sandbox')
co2.set_argument('--disable-blink-features=AutomationControlled')
co2.set_argument('--lang=zh-CN')
co2.set_argument(f'--user-agent={fingerprint["user_agent"]}')
co2.set_argument('--headless=new')
co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
if is_linux:
co2.set_argument('--disable-software-rasterizer')
co2.set_argument('--disable-extensions')
co2.set_argument('--disable-setuid-sandbox')
co2.set_argument('--single-process')
co2.set_argument('--remote-debugging-port=0')
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co2.set_browser_path(chrome_path)
break
else:
co2.auto_port(True)
co2.set_local_port(random.randint(30000, 39999))
try:
page = ChromiumPage(co2)
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
except Exception as e2:
return {"success": False, "error": f"浏览器启动失败: {e2}", "account": email, "password": password}
try:
step_cb("打开注册页面...")
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
page.get(TARGET_URL)
log_progress(f"当前URL: {page.url}")
step_cb("点击登录按钮...")
login_btn = page.ele('@data-testid=login-button', timeout=30)
if not login_btn:
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
login_btn.click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
step_cb("填写邮箱...")
log_progress("填入邮箱...")
email_input = page.ele('@name=email', timeout=30)
email_input.input(email)
page.ele('xpath://button[@type="submit"]').click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
step_cb("填写密码...")
log_progress("填入密码...")
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
password_input.input(password)
page.ele('xpath://button[@type="submit"]').click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
step_cb("等待验证邮件...")
log_status("步骤 2/5", "等待邮件 (All Mail)...")
verify_data = get_verification_content(email)
if not verify_data:
return {"success": False, "error": "未能获取验证码", "account": email, "password": password}
step_cb("执行邮箱验证...")
log_status("步骤 3/5", "执行验证...")
if verify_data['type'] == 'link':
page.new_tab(verify_data['val'])
time.sleep(5)
log_progress(f"当前URL: {page.url}")
elif verify_data['type'] == 'code':
code = verify_data['val']
log_progress(f"填入验证码: {code}")
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
code_input.input(code)
time.sleep(1)
try:
log_progress("尝试点击继续按钮...")
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
except:
log_progress("未找到按钮或已自动跳转...")
time.sleep(2)
log_progress(f"当前URL: {page.url}")
# 资料填写
step_cb("填写个人信息...")
log_status("步骤 4/5", "进入信息填写页...")
log_progress(f"当前URL: {page.url}")
name_input = page.ele('@name=name', timeout=20)
name_input.input(real_name)
log_progress(f"姓名: {real_name}")
# 生成随机生日
birth_year, birth_month, birth_day = generate_random_birthday()
# 优先使用精确输入,失败则使用备用方案
if not _input_birthday_precise(page, birth_year, birth_month, birth_day):
_input_birthday_fallback(page, birth_year, birth_month, birth_day)
time.sleep(1.5)
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
final_reg_btn.click()
time.sleep(2)
log_progress(f"当前URL: {page.url}")
# === 检测提交后是否有错误或需要额外操作 ===
submit_success = False
for check_attempt in range(10):
current_url = page.url
# 检查是否已跳转出 about-you 页面
if 'about-you' not in current_url:
log_progress(f"[OK] 页面已跳转: {current_url[:50]}...")
submit_success = True
break
# 检查是否有错误提示
try:
error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1)
if error_elem and error_elem.text:
log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}")
except:
pass
# 检查是否有未勾选的 checkbox如服务条款
try:
unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1)
if unchecked:
log_progress("发现未勾选的选项,尝试勾选...")
unchecked.click()
time.sleep(0.5)
# 重新点击提交
final_reg_btn = page.ele('css:button[type="submit"]', timeout=5)
if final_reg_btn:
final_reg_btn.click()
time.sleep(2)
except:
pass
# 检查是否有 CAPTCHA
try:
captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1)
if captcha:
log_progress("[!] 检测到 CAPTCHA需要人工处理或等待...")
except:
pass
time.sleep(1)
if not submit_success:
log_progress(f"[!] 提交后仍在 about-you 页面当前URL: {page.url}")
# 等待进入主页 - 快速检测
step_cb("等待进入主页...")
log_status("订阅", "等待进入 ChatGPT 主页...")
# 快速检测循环(减少等待时间)
entered_main = False
for i in range(20): # 最多等待 10 秒
current_url = page.url
# 更宽松的判断:只要不在 auth/login 页面就认为进入了主页
if 'chatgpt.com' in current_url:
if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url:
log_progress(f"[OK] 已进入主页: {current_url}")
entered_main = True
break
time.sleep(0.5) # 减少等待间隔
if not entered_main:
log_progress("[!] 等待主页超时,尝试继续...")
log_progress(f"当前URL: {page.url}")
# 尝试直接访问主页
page.get("https://chatgpt.com/")
time.sleep(2)
log_progress(f"跳转后URL: {page.url}")
# 跳转到支付页
step_cb("跳转到支付页...")
log_status("订阅", "执行 JS 跳转到支付页...")
# 直接执行 JS 跳转到支付页(无需额外等待)
checkout_js = '''
(async function(){
try {
const t = await(await fetch("/api/auth/session")).json();
if(!t.accessToken){ return "请先登录ChatGPT"; }
const p = {
plan_name: "chatgptteamplan",
team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 },
billing_details: { country: "DE", currency: "EUR" },
promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true },
checkout_ui_mode: "redirect"
};
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
method: "POST",
headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" },
body: JSON.stringify(p)
});
const d = await r.json();
if(d.url){ window.location.href = d.url; return "success"; }
else { return "提取失败:" + (d.detail || JSON.stringify(d)); }
} catch(e) { return "发生错误:" + e; }
})();
'''
result = page.run_js(checkout_js)
log_progress(f"JS 执行结果: {result}")
# 等待跳转到支付页
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("[OK] 已跳转到支付页")
log_progress(f"支付页URL: {page.url}")
except:
time.sleep(1)
log_progress(f"当前URL: {page.url}")
# 执行支付流程
step_cb("执行 SEPA 支付...")
result = run_payment_flow(page, email, step_cb)
if result and result.get("stopped"):
# 被 /stop 命令中断
log_status("停止", "[!] 注册被用户停止")
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
elif result and result.get("token"):
step_cb("注册成功!")
log_status("完成", "[OK] 注册成功!")
return {
"success": True,
"account": email,
"password": password,
"token": result["token"],
"account_id": result.get("account_id", "")
}
else:
log_status("失败", "注册失败: 支付流程失败")
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
except Exception as e:
error_msg = str(e)
# 只有连接断开才认为是停止请求
if _is_connection_lost(error_msg):
log_status("停止", "[!] 浏览器连接断开")
return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password}
log_status("错误", f"注册异常: {e}")
return {"success": False, "error": str(e), "account": email, "password": password}
finally:
if page:
try:
page.quit()
except:
pass
cleanup_chrome_processes()
def run_single_registration_api(progress_callback=None, step_callback=None, proxy: str = None) -> dict:
"""执行单次注册流程 - 协议模式 (API + Cookie 注入浏览器)
Args:
progress_callback: 进度回调函数 (message: str)
step_callback: 步骤回调函数 (step: str)
proxy: 代理地址
Returns:
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
"""
def log_cb(msg):
if progress_callback:
progress_callback(msg)
else:
log_progress(msg)
def step_cb(step):
if step_callback:
step_callback(step)
# 检查协议模式是否可用
if not API_MODE_AVAILABLE:
return {"success": False, "error": "协议模式不可用,请安装 curl_cffi: pip install curl_cffi"}
# 检查必要配置
if not MAIL_API_TOKEN or not MAIL_API_BASE:
return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"}
# 检查域名
domains = get_email_domains()
if not domains:
return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"}
# 检查 IBAN
ibans = get_sepa_ibans()
if not ibans:
return {"success": False, "error": "没有可用的 IBAN请先通过 /iban_add 导入"}
step_cb("生成账号信息...")
# 生成账号信息
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
email_domain = random.choice(domains)
email = f"{random_str}{email_domain}"
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
''.join(random.choices(string.ascii_lowercase, k=8)) + \
''.join(random.choices(string.digits, k=2)) + \
random.choice('!@#$%')
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
# 生成生日
year = random.randint(2000, 2004)
month = random.randint(1, 12)
if month in [1, 3, 5, 7, 8, 10, 12]:
max_day = 31
elif month in [4, 6, 9, 11]:
max_day = 30
else:
max_day = 29 if year % 4 == 0 else 28
day = random.randint(1, max_day)
birthdate = f"{year}-{month:02d}-{day:02d}"
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
log_status("初始化", f"姓名: {real_name} | 生日: {birthdate}")
log_status("模式", "协议模式 (API + Cookie 注入)")
# 使用配置的代理或传入的代理
use_proxy = proxy or API_PROXY or None
if use_proxy:
log_status("代理", f"使用代理: {use_proxy}")
try:
# 阶段 1: API 注册
step_cb("API 快速注册...")
log_status("阶段 1", "========== API 快速注册 ==========")
reg = api_register_flow(
email=email,
password=password,
real_name=real_name,
birthdate=birthdate,
mail_api_base=MAIL_API_BASE,
mail_api_token=MAIL_API_TOKEN,
proxy=use_proxy,
progress_callback=log_cb
)
if not reg:
log_status("失败", "API 注册失败")
return {"success": False, "error": "API 注册失败", "account": email, "password": password}
log_status("完成", "[OK] API 注册成功!")
# 阶段 2: Cookie 注入浏览器 + 支付
step_cb("Cookie 注入浏览器...")
log_status("阶段 2", "========== Cookie 注入浏览器 + 订阅支付 ==========")
result = browser_pay_with_cookies(
reg=reg,
email=email,
proxy=use_proxy,
headless=True,
step_callback=step_cb
)
if result and result.get("stopped"):
log_status("停止", "[!] 注册被用户停止")
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
elif result and result.get("token"):
step_cb("注册成功!")
log_status("完成", "[OK] 全部流程完成!")
return {
"success": True,
"account": email,
"password": password,
"token": result["token"],
"account_id": result.get("account_id", "")
}
# 如果 Cookie 注入失败,尝试 API 登录方式
log_status("重试", "Cookie 注入失败,尝试 API 登录...")
step_cb("尝试 API 登录...")
reg2 = api_login_flow(
email=email,
password=password,
proxy=use_proxy,
progress_callback=log_cb
)
if reg2:
result = browser_pay_with_cookies(
reg=reg2,
email=email,
proxy=use_proxy,
headless=True,
step_callback=step_cb
)
if result and result.get("token"):
step_cb("注册成功!")
log_status("完成", "[OK] 全部流程完成!")
return {
"success": True,
"account": email,
"password": password,
"token": result["token"],
"account_id": result.get("account_id", "")
}
log_status("失败", "注册成功但支付/获取token失败")
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
except ShutdownRequested:
log_status("停止", "[!] 用户请求停止")
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
except Exception as e:
error_msg = str(e)
# 只有连接断开才认为是停止请求
if _is_connection_lost(error_msg):
log_status("停止", "[!] 浏览器连接断开")
return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password}
log_status("错误", f"注册异常: {e}")
return {"success": False, "error": str(e), "account": email, "password": password}
finally:
cleanup_chrome_processes()
def run_single_registration_auto(progress_callback=None, step_callback=None, mode: str = None) -> dict:
"""自动选择模式执行注册
Args:
progress_callback: 进度回调
step_callback: 步骤回调
mode: 强制指定模式 ("browser" / "api")None 则使用配置
Returns:
dict: 注册结果
"""
use_mode = mode or REGISTER_MODE
if use_mode == "api":
if not API_MODE_AVAILABLE:
log_status("警告", "协议模式不可用,回退到浏览器模式")
return run_single_registration(progress_callback, step_callback)
return run_single_registration_api(progress_callback, step_callback)
else:
return run_single_registration(progress_callback, step_callback)
def get_register_mode() -> str:
"""获取当前注册模式"""
return REGISTER_MODE
def set_register_mode(mode: str) -> bool:
"""设置注册模式 (运行时)
Args:
mode: "browser""api"
Returns:
bool: 是否设置成功
"""
global REGISTER_MODE
if mode in ("browser", "api"):
if mode == "api" and not API_MODE_AVAILABLE:
return False
REGISTER_MODE = mode
return True
return False
def is_api_mode_supported() -> bool:
"""检查协议模式是否支持"""
return API_MODE_AVAILABLE
if __name__ == "__main__":
run_main_process()