2377 lines
88 KiB
Python
2377 lines
88 KiB
Python
"""
|
||
Author: muyyg
|
||
Project: Subscription Automation (DrissionPage Version)
|
||
Created: 2026-01-12
|
||
Version: 3.1-hybrid (支持协议模式)
|
||
|
||
模式说明:
|
||
- browser: 浏览器自动化模式 (默认),全程使用 DrissionPage 浏览器自动化
|
||
- api: 协议模式,使用 API 快速完成注册,仅支付环节使用浏览器
|
||
"""
|
||
|
||
import time
|
||
import random
|
||
import string
|
||
import re
|
||
import sys
|
||
import os
|
||
import platform
|
||
import subprocess
|
||
import tempfile
|
||
import requests
|
||
from pathlib import Path
|
||
from DrissionPage import ChromiumPage, ChromiumOptions
|
||
|
||
# 导入协议模式模块
|
||
try:
|
||
from api_register import (
|
||
ChatGPTAPIRegister,
|
||
api_register_flow,
|
||
api_login_flow,
|
||
is_api_mode_available,
|
||
get_verification_code_api,
|
||
)
|
||
API_MODE_AVAILABLE = is_api_mode_available()
|
||
except ImportError:
|
||
API_MODE_AVAILABLE = False
|
||
ChatGPTAPIRegister = None
|
||
api_register_flow = None
|
||
api_login_flow = None
|
||
|
||
# ================= 配置加载 =================
|
||
try:
|
||
import tomllib
|
||
except ImportError:
|
||
try:
|
||
import tomli as tomllib
|
||
except ImportError:
|
||
tomllib = None
|
||
|
||
BASE_DIR = Path(__file__).parent
|
||
CONFIG_FILE = BASE_DIR / "config.toml"
|
||
|
||
def _load_config():
|
||
"""从 config.toml 加载配置"""
|
||
if tomllib is None or not CONFIG_FILE.exists():
|
||
return {}
|
||
try:
|
||
with open(CONFIG_FILE, "rb") as f:
|
||
return tomllib.load(f)
|
||
except Exception:
|
||
return {}
|
||
|
||
_cfg = _load_config()
|
||
_autogptplus = _cfg.get("autogptplus", {})
|
||
|
||
# ================= 核心配置区域 =================
|
||
# 从 config.toml [autogptplus] 读取,如未配置则使用默认值
|
||
|
||
# 1. 管理员 Token
|
||
MAIL_API_TOKEN = _autogptplus.get("mail_api_token", "")
|
||
|
||
# 2. 你的域名后缀(随机选择)
|
||
EMAIL_DOMAINS = _autogptplus.get("email_domains", [])
|
||
|
||
# 3. Cloud-Mail 部署地址
|
||
MAIL_API_BASE = _autogptplus.get("mail_api_base", "")
|
||
|
||
# 4. 接口路径 (邮件查询)
|
||
MAIL_API_PATH = "/api/public/emailList"
|
||
|
||
# 5. SEPA IBAN 列表 (从配置文件读取)
|
||
SEPA_IBANS = _autogptplus.get("sepa_ibans", [])
|
||
|
||
# 6. 随机指纹开关
|
||
RANDOM_FINGERPRINT = _autogptplus.get("random_fingerprint", True)
|
||
|
||
# 7. 注册模式: "browser" (浏览器自动化) 或 "api" (协议模式)
|
||
# 默认使用 API 模式(更快),如果 curl_cffi 不可用则自动回退到浏览器模式
|
||
REGISTER_MODE = _autogptplus.get("register_mode", "api")
|
||
|
||
# 8. 协议模式代理 (仅协议模式使用)
|
||
API_PROXY = _autogptplus.get("api_proxy", "")
|
||
|
||
# ================= 浏览器指纹 =================
|
||
FINGERPRINTS = [
|
||
# NVIDIA 显卡
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 2560, "height": 1440}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4080 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 3840, "height": 2160}
|
||
},
|
||
# AMD 显卡
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (AMD)",
|
||
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6800 XT Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 2560, "height": 1440}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (AMD)",
|
||
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 7900 XTX Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 3840, "height": 2160}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (AMD)",
|
||
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6700 XT Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
# Intel 显卡
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (Intel)",
|
||
"webgl_renderer": "ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (Intel)",
|
||
"webgl_renderer": "ANGLE (Intel, Intel(R) Iris Xe Graphics Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (Intel)",
|
||
"webgl_renderer": "ANGLE (Intel, Intel(R) Arc A770 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 2560, "height": 1440}
|
||
},
|
||
# 笔记本配置
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3050 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
},
|
||
{
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 2560, "height": 1600}
|
||
},
|
||
]
|
||
|
||
def get_random_fingerprint() -> dict:
|
||
"""随机获取一个浏览器指纹"""
|
||
return random.choice(FINGERPRINTS)
|
||
|
||
def inject_fingerprint(page, fingerprint: dict):
|
||
"""注入浏览器指纹伪装脚本"""
|
||
global _last_log_message, _last_log_time
|
||
try:
|
||
webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)")
|
||
webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)")
|
||
plat = fingerprint.get("platform", "Win32")
|
||
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
|
||
|
||
js_script = f'''
|
||
// 伪装 WebGL 指纹
|
||
const getParameterProxyHandler = {{
|
||
apply: function(target, thisArg, args) {{
|
||
const param = args[0];
|
||
if (param === 37445) {{ return "{webgl_vendor}"; }}
|
||
if (param === 37446) {{ return "{webgl_renderer}"; }}
|
||
return Reflect.apply(target, thisArg, args);
|
||
}}
|
||
}};
|
||
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
|
||
WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler);
|
||
if (typeof WebGL2RenderingContext !== 'undefined') {{
|
||
const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter;
|
||
WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler);
|
||
}}
|
||
// 伪装 platform
|
||
Object.defineProperty(navigator, 'platform', {{ get: () => "{plat}" }});
|
||
// 伪装屏幕分辨率
|
||
Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]} }});
|
||
Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]} }});
|
||
Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]} }});
|
||
Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]} }});
|
||
// 隐藏 webdriver 特征
|
||
Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined }});
|
||
// 伪装 languages
|
||
Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"] }});
|
||
// 伪装 plugins
|
||
Object.defineProperty(navigator, 'plugins', {{
|
||
get: () => [
|
||
{{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }},
|
||
{{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }},
|
||
{{ name: "Native Client", filename: "internal-nacl-plugin" }}
|
||
]
|
||
}});
|
||
'''
|
||
page.run_js(js_script)
|
||
|
||
# 获取浏览器语言设置
|
||
try:
|
||
browser_lang = page.run_js('return navigator.language || navigator.userLanguage || "unknown"')
|
||
except:
|
||
browser_lang = "unknown"
|
||
|
||
# 避免重复日志:1秒内相同消息不重复输出
|
||
current_time = time.time()
|
||
log_msg = f"已注入: {webgl_renderer} | {screen['width']}x{screen['height']} | 语言: {browser_lang}"
|
||
if log_msg != _last_log_message or current_time - _last_log_time > 1:
|
||
log_status("指纹", log_msg)
|
||
_last_log_message = log_msg
|
||
_last_log_time = current_time
|
||
except Exception as e:
|
||
log_status("指纹", f"注入失败: {e}")
|
||
|
||
# ================= IBAN 管理函数 =================
|
||
IBAN_FILE = BASE_DIR / "sepa_ibans.txt"
|
||
|
||
def load_ibans_from_file():
|
||
"""从文件加载 IBAN 列表"""
|
||
if not IBAN_FILE.exists():
|
||
return []
|
||
try:
|
||
with open(IBAN_FILE, "r", encoding="utf-8") as f:
|
||
ibans = [line.strip() for line in f if line.strip() and line.strip().startswith("DE")]
|
||
return ibans
|
||
except Exception:
|
||
return []
|
||
|
||
def save_ibans_to_file(ibans: list):
|
||
"""保存 IBAN 列表到文件"""
|
||
try:
|
||
with open(IBAN_FILE, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(ibans))
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def get_sepa_ibans():
|
||
"""获取 SEPA IBAN 列表 (优先从文件读取)"""
|
||
file_ibans = load_ibans_from_file()
|
||
if file_ibans:
|
||
return file_ibans
|
||
return SEPA_IBANS
|
||
|
||
def add_sepa_ibans(new_ibans: list) -> tuple:
|
||
"""添加 IBAN 到列表
|
||
|
||
Args:
|
||
new_ibans: 新的 IBAN 列表
|
||
|
||
Returns:
|
||
tuple: (添加数量, 跳过数量, 当前总数)
|
||
"""
|
||
current = set(load_ibans_from_file())
|
||
added = 0
|
||
skipped = 0
|
||
|
||
for iban in new_ibans:
|
||
iban = iban.strip().upper()
|
||
if not iban or not iban.startswith("DE"):
|
||
continue
|
||
if iban in current:
|
||
skipped += 1
|
||
else:
|
||
current.add(iban)
|
||
added += 1
|
||
|
||
save_ibans_to_file(sorted(current))
|
||
return added, skipped, len(current)
|
||
|
||
def clear_sepa_ibans():
|
||
"""清空 IBAN 列表"""
|
||
if IBAN_FILE.exists():
|
||
IBAN_FILE.unlink()
|
||
return True
|
||
|
||
# ================= 域名管理函数 =================
|
||
DOMAIN_FILE = BASE_DIR / "email_domains.txt"
|
||
|
||
def load_domains_from_file():
|
||
"""从文件加载域名列表"""
|
||
if not DOMAIN_FILE.exists():
|
||
return []
|
||
try:
|
||
with open(DOMAIN_FILE, "r", encoding="utf-8") as f:
|
||
domains = [line.strip() for line in f if line.strip() and line.strip().startswith("@")]
|
||
return domains
|
||
except Exception:
|
||
return []
|
||
|
||
def save_domains_to_file(domains: list):
|
||
"""保存域名列表到文件"""
|
||
try:
|
||
with open(DOMAIN_FILE, "w", encoding="utf-8") as f:
|
||
f.write("\n".join(domains))
|
||
return True
|
||
except Exception:
|
||
return False
|
||
|
||
def get_email_domains():
|
||
"""获取邮箱域名列表 (合并文件和配置)"""
|
||
file_domains = set(load_domains_from_file())
|
||
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
|
||
# 合并两个来源的域名
|
||
all_domains = file_domains | config_domains
|
||
return sorted(all_domains) if all_domains else []
|
||
|
||
def validate_domain_format(domain: str) -> tuple:
|
||
"""验证域名格式是否正确
|
||
|
||
Args:
|
||
domain: 要验证的域名 (带或不带@前缀)
|
||
|
||
Returns:
|
||
tuple: (是否有效, 标准化的域名或错误信息)
|
||
"""
|
||
domain = domain.strip().lower()
|
||
|
||
# 移除开头的引号和尾部特殊字符
|
||
domain = domain.strip('"\'')
|
||
|
||
# 确保以 @ 开头
|
||
if not domain.startswith("@"):
|
||
domain = "@" + domain
|
||
|
||
# 移除尾部可能的引号或逗号
|
||
domain = domain.rstrip('",\'')
|
||
|
||
# 基本长度检查 (至少 @x.y)
|
||
if len(domain) < 4:
|
||
return False, "域名太短"
|
||
|
||
# 提取 @ 后面的部分进行验证
|
||
domain_part = domain[1:] # 去掉 @
|
||
|
||
# 检查是否包含至少一个点
|
||
if "." not in domain_part:
|
||
return False, "域名缺少点号"
|
||
|
||
# 检查点的位置 (不能在开头或结尾)
|
||
if domain_part.startswith(".") or domain_part.endswith("."):
|
||
return False, "点号位置不正确"
|
||
|
||
# 检查不能有连续的点
|
||
if ".." in domain_part:
|
||
return False, "不能有连续的点号"
|
||
|
||
# 检查每个部分是否有效
|
||
parts = domain_part.split(".")
|
||
for part in parts:
|
||
if not part:
|
||
return False, "域名部分为空"
|
||
# 检查是否只包含有效字符 (字母、数字、连字符)
|
||
if not all(c.isalnum() or c == "-" for c in part):
|
||
return False, f"域名包含无效字符"
|
||
# 不能以连字符开头或结尾
|
||
if part.startswith("-") or part.endswith("-"):
|
||
return False, "域名部分不能以连字符开头或结尾"
|
||
|
||
# 顶级域名至少2个字符
|
||
if len(parts[-1]) < 2:
|
||
return False, "顶级域名太短"
|
||
|
||
return True, domain
|
||
|
||
|
||
def add_email_domains(new_domains: list) -> tuple:
|
||
"""添加域名到列表
|
||
|
||
Args:
|
||
new_domains: 新的域名列表
|
||
|
||
Returns:
|
||
tuple: (添加数量, 跳过数量, 无效数量, 当前总数)
|
||
"""
|
||
# 获取当前所有域名(文件 + 配置)
|
||
current = set(load_domains_from_file())
|
||
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
|
||
all_existing = current | config_domains
|
||
|
||
added = 0
|
||
skipped = 0
|
||
invalid = 0
|
||
|
||
for domain in new_domains:
|
||
# 验证域名格式
|
||
is_valid, result = validate_domain_format(domain)
|
||
|
||
if not is_valid:
|
||
invalid += 1
|
||
continue
|
||
|
||
domain = result # 使用标准化后的域名
|
||
|
||
if domain in all_existing:
|
||
skipped += 1
|
||
else:
|
||
current.add(domain)
|
||
all_existing.add(domain)
|
||
added += 1
|
||
|
||
# 只保存通过 Bot 添加的域名到文件
|
||
save_domains_to_file(sorted(current))
|
||
return added, skipped, invalid, len(all_existing)
|
||
|
||
def remove_email_domain(domain: str) -> bool:
|
||
"""删除指定域名 (只能删除通过 Bot 添加的域名)
|
||
|
||
Args:
|
||
domain: 要删除的域名
|
||
|
||
Returns:
|
||
bool: 是否删除成功
|
||
"""
|
||
current = set(load_domains_from_file())
|
||
|
||
domain = domain.strip().lower()
|
||
if not domain.startswith("@"):
|
||
domain = "@" + domain
|
||
|
||
if domain in current:
|
||
current.remove(domain)
|
||
save_domains_to_file(sorted(current))
|
||
return True
|
||
return False
|
||
|
||
def get_file_domains_count() -> int:
|
||
"""获取txt文件中的域名数量 (不包含config配置的)"""
|
||
return len(load_domains_from_file())
|
||
|
||
|
||
def clear_email_domains() -> int:
|
||
"""清空域名列表 (只清空txt文件,保留config配置)
|
||
|
||
Returns:
|
||
int: 被清空的域名数量
|
||
"""
|
||
count = len(load_domains_from_file())
|
||
if DOMAIN_FILE.exists():
|
||
DOMAIN_FILE.unlink()
|
||
return count
|
||
|
||
# ================= 固定配置 =================
|
||
TARGET_URL = "https://chatgpt.com"
|
||
|
||
def generate_random_birthday():
|
||
"""生成随机生日 (2000-2004年)"""
|
||
year = random.randint(2000, 2004)
|
||
month = random.randint(1, 12)
|
||
# 根据月份确定天数
|
||
if month in [1, 3, 5, 7, 8, 10, 12]:
|
||
max_day = 31
|
||
elif month in [4, 6, 9, 11]:
|
||
max_day = 30
|
||
else: # 2月
|
||
max_day = 29 if year % 4 == 0 else 28
|
||
day = random.randint(1, max_day)
|
||
return str(year), f"{month:02d}", f"{day:02d}"
|
||
|
||
|
||
def _detect_page_language(page) -> str:
|
||
"""检测页面语言
|
||
|
||
Returns:
|
||
str: 'zh' 中文, 'en' 英文
|
||
"""
|
||
try:
|
||
html = page.html
|
||
# 检查中文关键词
|
||
chinese_keywords = ['确认', '继续', '登录', '注册', '验证', '邮箱', '密码', '姓名', '生日']
|
||
for keyword in chinese_keywords:
|
||
if keyword in html:
|
||
return 'zh'
|
||
return 'en'
|
||
except Exception:
|
||
return 'en' # 默认英文
|
||
|
||
|
||
def _input_birthday_precise(page, birth_year: str, birth_month: str, birth_day: str) -> bool:
|
||
"""精确输入生日 (使用 data-type 选择器定位输入框)
|
||
|
||
中文界面: yyyy/mm/dd (年/月/日)
|
||
英文界面: mm/dd/yyyy (月/日/年)
|
||
|
||
Args:
|
||
page: 浏览器页面对象
|
||
birth_year: 年份 (如 "2000")
|
||
birth_month: 月份 (如 "07")
|
||
birth_day: 日期 (如 "15")
|
||
|
||
Returns:
|
||
bool: 是否成功
|
||
"""
|
||
try:
|
||
# 使用 data-type 属性精确定位三个输入框
|
||
year_input = page.ele('css:[data-type="year"]', timeout=5)
|
||
month_input = page.ele('css:[data-type="month"]', timeout=3)
|
||
day_input = page.ele('css:[data-type="day"]', timeout=3)
|
||
|
||
if not all([year_input, month_input, day_input]):
|
||
log_progress("[!] 未找到完整的生日输入框 (data-type),尝试备用方案...")
|
||
return False
|
||
|
||
# 检测页面语言
|
||
lang = _detect_page_language(page)
|
||
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (界面: {lang})")
|
||
|
||
# 根据语言决定输入顺序
|
||
if lang == 'zh':
|
||
# 中文: 年 -> 月 -> 日
|
||
input_order = [(year_input, birth_year),
|
||
(month_input, birth_month),
|
||
(day_input, birth_day)]
|
||
else:
|
||
# 英文: 月 -> 日 -> 年
|
||
input_order = [(month_input, birth_month),
|
||
(day_input, birth_day),
|
||
(year_input, birth_year)]
|
||
|
||
# 按顺序输入
|
||
for input_elem, value in input_order:
|
||
try:
|
||
input_elem.click()
|
||
time.sleep(0.15)
|
||
input_elem.input(value, clear=True)
|
||
time.sleep(0.2)
|
||
except Exception as e:
|
||
log_progress(f"[!] 生日字段输入异常: {e}")
|
||
return False
|
||
|
||
log_progress("[OK] 生日已输入 (精确模式)")
|
||
return True
|
||
|
||
except Exception as e:
|
||
log_progress(f"[!] 精确生日输入失败: {e}")
|
||
return False
|
||
|
||
|
||
def _input_birthday_fallback(page, birth_year: str, birth_month: str, birth_day: str):
|
||
"""备用生日输入方式 (Tab 键切换 + 逐字符输入)
|
||
|
||
Args:
|
||
page: 浏览器页面对象
|
||
birth_year: 年份
|
||
birth_month: 月份
|
||
birth_day: 日期
|
||
"""
|
||
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (备用模式)")
|
||
|
||
# 使用 Tab 键切换到生日字段
|
||
page.actions.key_down('Tab').key_up('Tab')
|
||
time.sleep(0.3)
|
||
|
||
# 逐字符输入
|
||
birth_str = birth_year + birth_month + birth_day
|
||
for digit in birth_str:
|
||
page.actions.type(digit)
|
||
time.sleep(0.1)
|
||
|
||
log_progress("[OK] 生日已输入 (备用模式)")
|
||
|
||
# 地址格式: (街道, 邮编, 城市)
|
||
SEPA_ADDRESSES = [
|
||
# 柏林
|
||
("Alexanderplatz 1", "10178", "Berlin"),
|
||
("Unter den Linden 77", "10117", "Berlin"),
|
||
("Kurfürstendamm 21", "10719", "Berlin"),
|
||
("Friedrichstraße 43", "10117", "Berlin"),
|
||
("Potsdamer Platz 1", "10785", "Berlin"),
|
||
("Tauentzienstraße 9", "10789", "Berlin"),
|
||
# 慕尼黑
|
||
("Marienplatz 8", "80331", "München"),
|
||
("Leopoldstraße 32", "80802", "München"),
|
||
("Maximilianstraße 17", "80539", "München"),
|
||
("Kaufingerstraße 28", "80331", "München"),
|
||
("Sendlinger Straße 3", "80331", "München"),
|
||
# 汉堡
|
||
("Mönckebergstraße 16", "20095", "Hamburg"),
|
||
("Jungfernstieg 38", "20354", "Hamburg"),
|
||
("Spitalerstraße 12", "20095", "Hamburg"),
|
||
("Neuer Wall 50", "20354", "Hamburg"),
|
||
# 法兰克福
|
||
("Zeil 106", "60313", "Frankfurt am Main"),
|
||
("Kaiserstraße 62", "60329", "Frankfurt am Main"),
|
||
("Goethestraße 1", "60313", "Frankfurt am Main"),
|
||
("Große Bockenheimer Str. 2", "60313", "Frankfurt am Main"),
|
||
# 科隆
|
||
("Hohe Straße 111", "50667", "Köln"),
|
||
("Schildergasse 24", "50667", "Köln"),
|
||
("Breite Straße 80", "50667", "Köln"),
|
||
# 斯图加特
|
||
("Königstraße 2", "70173", "Stuttgart"),
|
||
("Calwer Straße 19", "70173", "Stuttgart"),
|
||
("Schulstraße 5", "70173", "Stuttgart"),
|
||
# 杜塞尔多夫
|
||
("Königsallee 60", "40212", "Düsseldorf"),
|
||
("Schadowstraße 11", "40212", "Düsseldorf"),
|
||
("Flinger Straße 36", "40213", "Düsseldorf"),
|
||
# 莱比锡
|
||
("Grimmaische Straße 25", "04109", "Leipzig"),
|
||
("Petersstraße 36", "04109", "Leipzig"),
|
||
# 德累斯顿
|
||
("Prager Straße 12", "01069", "Dresden"),
|
||
("Altmarkt 10", "01067", "Dresden"),
|
||
# 纽伦堡
|
||
("Karolinenstraße 12", "90402", "Nürnberg"),
|
||
("Breite Gasse 23", "90402", "Nürnberg"),
|
||
# 汉诺威
|
||
("Georgstraße 10", "30159", "Hannover"),
|
||
("Bahnhofstraße 5", "30159", "Hannover"),
|
||
# 不来梅
|
||
("Obernstraße 2", "28195", "Bremen"),
|
||
("Sögestraße 18", "28195", "Bremen"),
|
||
]
|
||
|
||
FIRST_NAMES = [
|
||
# 德国常见男性名
|
||
"Lukas", "Leon", "Maximilian", "Felix", "Paul", "Jonas", "Tim", "David",
|
||
"Niklas", "Jan", "Philipp", "Moritz", "Alexander", "Sebastian", "Florian",
|
||
"Julian", "Tobias", "Simon", "Daniel", "Christian", "Markus", "Thomas",
|
||
"Michael", "Stefan", "Andreas", "Martin", "Matthias", "Benjamin", "Patrick",
|
||
# 德国常见女性名
|
||
"Anna", "Laura", "Julia", "Lena", "Sarah", "Lisa", "Marie", "Sophie",
|
||
"Katharina", "Hannah", "Emma", "Mia", "Lea", "Johanna", "Clara",
|
||
"Charlotte", "Emilia", "Luisa", "Nina", "Elena", "Melanie", "Christina",
|
||
"Sandra", "Nicole", "Sabine", "Claudia", "Petra", "Monika", "Stefanie",
|
||
]
|
||
LAST_NAMES = [
|
||
# 德国最常见姓氏
|
||
"Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner",
|
||
"Becker", "Schulz", "Hoffmann", "Schäfer", "Koch", "Bauer", "Richter",
|
||
"Klein", "Wolf", "Schröder", "Neumann", "Schwarz", "Zimmermann", "Braun",
|
||
"Krüger", "Hofmann", "Hartmann", "Lange", "Schmitt", "Werner", "Schmitz",
|
||
"Krause", "Meier", "Lehmann", "Schmid", "Schulze", "Maier", "Köhler",
|
||
"Herrmann", "König", "Walter", "Mayer", "Huber", "Kaiser", "Fuchs",
|
||
"Peters", "Lang", "Scholz", "Möller", "Weiß", "Jung", "Hahn", "Schubert",
|
||
]
|
||
|
||
# ================= 工具函数 =================
|
||
|
||
# 日志去重缓存
|
||
_last_log_message = ""
|
||
_last_log_time = 0
|
||
|
||
|
||
def cleanup_chrome_processes():
|
||
"""清理残留的 Chrome 进程 (跨平台支持)"""
|
||
global _last_log_message, _last_log_time
|
||
try:
|
||
if platform.system() == "Windows":
|
||
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
|
||
try:
|
||
subprocess.run(
|
||
['taskkill', '/F', '/IM', 'chromedriver.exe'],
|
||
capture_output=True, timeout=5
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
|
||
try:
|
||
result = subprocess.run(
|
||
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
for line in result.stdout.strip().split('\n'):
|
||
pid = line.strip()
|
||
if pid.isdigit():
|
||
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
|
||
except Exception:
|
||
pass
|
||
else:
|
||
# Linux/Mac: 使用 pkill
|
||
try:
|
||
subprocess.run(
|
||
['pkill', '-f', 'chromedriver'],
|
||
capture_output=True, timeout=5
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# 清理无头模式的 chrome 进程
|
||
try:
|
||
subprocess.run(
|
||
['pkill', '-f', 'chrome.*--headless'],
|
||
capture_output=True, timeout=5
|
||
)
|
||
except Exception:
|
||
pass
|
||
|
||
# 避免重复日志:1秒内相同消息不重复输出
|
||
current_time = time.time()
|
||
log_msg = "已清理 Chrome 残留进程"
|
||
if log_msg != _last_log_message or current_time - _last_log_time > 1:
|
||
log_status("清理", log_msg)
|
||
_last_log_message = log_msg
|
||
_last_log_time = current_time
|
||
except Exception:
|
||
pass # 静默处理,不影响主流程
|
||
|
||
def log_status(step, message):
|
||
timestamp = time.strftime("%H:%M:%S")
|
||
print(f"[{timestamp}] [{step}] {message}")
|
||
sys.stdout.flush()
|
||
|
||
def log_progress(message):
|
||
print(f" {message}")
|
||
sys.stdout.flush()
|
||
|
||
def save_account(email, password, token, account_id=""):
|
||
"""保存账号信息到 JSON 文件"""
|
||
import json
|
||
accounts_file = "accounts.json"
|
||
|
||
# 读取现有账号
|
||
accounts = []
|
||
try:
|
||
with open(accounts_file, 'r', encoding='utf-8') as f:
|
||
accounts = json.load(f)
|
||
except:
|
||
pass
|
||
|
||
# 添加新账号
|
||
account_data = {
|
||
"account": email,
|
||
"password": password,
|
||
"token": token
|
||
}
|
||
if account_id:
|
||
account_data["account_id"] = account_id
|
||
|
||
accounts.append(account_data)
|
||
|
||
# 保存
|
||
with open(accounts_file, 'w', encoding='utf-8') as f:
|
||
json.dump(accounts, f, ensure_ascii=False, indent=2)
|
||
|
||
log_status("保存", f"账号已保存到 {accounts_file}")
|
||
|
||
def get_verification_content(target_email, max_retries=90):
|
||
log_status("API监听", f"正在监听邮件 ({MAIL_API_PATH})...")
|
||
|
||
headers = {
|
||
"Authorization": MAIL_API_TOKEN,
|
||
"Content-Type": "application/json"
|
||
}
|
||
|
||
start_time = time.time()
|
||
|
||
for i in range(max_retries):
|
||
elapsed = int(time.time() - start_time)
|
||
try:
|
||
url = f"{MAIL_API_BASE}{MAIL_API_PATH}"
|
||
payload = {
|
||
"toEmail": target_email,
|
||
"timeSort": "desc",
|
||
"size": 20
|
||
}
|
||
resp = requests.post(url, headers=headers, json=payload, timeout=10)
|
||
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
if data.get('code') == 200:
|
||
mails = data.get('data', [])
|
||
|
||
if mails:
|
||
for mail in mails:
|
||
log_status("捕获", "✅ 成功抓取到目标邮件!")
|
||
html_body = mail.get('content') or mail.get('text') or str(mail)
|
||
|
||
# 提取验证码
|
||
code_match = re.search(r'\b(\d{6})\b', html_body)
|
||
if code_match:
|
||
code = code_match.group(1)
|
||
log_status("解析", f"提取到验证码: {code}")
|
||
return {"type": "code", "val": code}
|
||
|
||
# 提取链接
|
||
link_match = re.search(r'href="(https://.*openai\.com/.*verification.*)"', html_body)
|
||
if not link_match:
|
||
link_match = re.search(r'href="(https://auth0\.openai\.com/u/login/identifier\?state=[^"]+)"', html_body)
|
||
if link_match:
|
||
link = link_match.group(1)
|
||
log_status("解析", f"提取到链接: {link[:30]}...")
|
||
return {"type": "link", "val": link}
|
||
except:
|
||
pass
|
||
|
||
if i % 5 == 0:
|
||
print(f" [监听中] 已耗时 {elapsed}秒...")
|
||
sys.stdout.flush()
|
||
time.sleep(2)
|
||
|
||
log_status("超时", "[X] 未能获取验证码。")
|
||
return None
|
||
|
||
|
||
def _is_shutdown_requested():
|
||
"""检查是否收到停止请求"""
|
||
try:
|
||
import run
|
||
return run._shutdown_requested
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _is_connection_lost(error_msg):
|
||
"""检查是否是连接断开错误(通常由 /stop 命令导致)"""
|
||
disconnect_keywords = [
|
||
"connection to the page has been disconnected",
|
||
"page has been closed",
|
||
"target closed",
|
||
"session closed",
|
||
"browser has disconnected",
|
||
"no such window",
|
||
"invalid session id"
|
||
]
|
||
error_lower = str(error_msg).lower()
|
||
return any(kw in error_lower for kw in disconnect_keywords)
|
||
|
||
|
||
def run_payment_flow(page, email, step_callback=None):
|
||
"""执行 SEPA 支付流程 - 严格按顺序执行,任一步骤失败则终止
|
||
|
||
Args:
|
||
page: 浏览器页面对象
|
||
email: 邮箱地址
|
||
step_callback: 步骤回调函数 (step: str)
|
||
|
||
Returns:
|
||
dict: 成功时返回 {"token": ..., "account_id": ...}
|
||
被停止时返回 {"stopped": True}
|
||
失败时返回 None
|
||
"""
|
||
def step_cb(step):
|
||
if step_callback:
|
||
step_callback(step)
|
||
|
||
# 检查停止请求
|
||
if _is_shutdown_requested():
|
||
log_progress("[!] 检测到停止请求,中断支付流程")
|
||
return {"stopped": True}
|
||
|
||
log_status("支付流程", "开始处理 Stripe 支付页...")
|
||
|
||
try:
|
||
# 等待支付页加载完成
|
||
step_cb("等待支付页加载...")
|
||
log_status("支付页", "等待支付页加载...")
|
||
|
||
# 等待页面稳定(Stripe 支付页需要时间渲染)
|
||
time.sleep(3)
|
||
|
||
# 尝试多种方式定位邮箱输入框
|
||
email_input = None
|
||
for attempt in range(3):
|
||
# 方式1: 直接 ID
|
||
email_input = page.ele('#email', timeout=5)
|
||
if email_input:
|
||
break
|
||
|
||
# 方式2: name 属性
|
||
email_input = page.ele('@name=email', timeout=3)
|
||
if email_input:
|
||
break
|
||
|
||
# 方式3: CSS 选择器
|
||
email_input = page.ele('css:input[type="email"], input[id*="email"], input[name*="email"]', timeout=3)
|
||
if email_input:
|
||
break
|
||
|
||
log_progress(f"[尝试{attempt+1}] 等待邮箱输入框...")
|
||
time.sleep(2)
|
||
|
||
if email_input:
|
||
log_progress("[OK] 支付页已加载")
|
||
else:
|
||
log_progress("[!] 邮箱输入框未立即出现,继续尝试...")
|
||
time.sleep(3)
|
||
|
||
# 随机选择 IBAN 和地址
|
||
ibans = get_sepa_ibans()
|
||
if not ibans:
|
||
log_progress("[X] 没有可用的 IBAN,请先通过 Bot 导入")
|
||
return None
|
||
sepa_iban = random.choice(ibans)
|
||
street, postal_code, city = random.choice(SEPA_ADDRESSES)
|
||
account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
|
||
|
||
log_progress(f"使用 IBAN: {sepa_iban[:8]}...")
|
||
log_progress(f"使用地址: {street}, {postal_code} {city}")
|
||
log_progress(f"账户名: {account_name}")
|
||
|
||
# ========== 步骤 1: 填写邮箱 ==========
|
||
step_cb("填写支付邮箱...")
|
||
log_progress("[1] 填写邮箱...")
|
||
try:
|
||
# 检查停止请求
|
||
if _is_shutdown_requested():
|
||
log_progress("[!] 检测到停止请求")
|
||
return {"stopped": True}
|
||
|
||
# 如果之前已经找到了,直接使用;否则重新查找
|
||
if not email_input:
|
||
email_input = page.ele('#email', timeout=10)
|
||
if not email_input:
|
||
email_input = page.ele('@name=email', timeout=5)
|
||
if not email_input:
|
||
email_input = page.ele('css:input[type="email"]', timeout=5)
|
||
if not email_input:
|
||
log_progress("[X] 邮箱输入框未找到")
|
||
log_progress(f"当前URL: {page.url}")
|
||
return None
|
||
email_input.clear()
|
||
email_input.input(email)
|
||
log_progress(f"[OK] 已填写邮箱: {email}")
|
||
time.sleep(1)
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
if _is_connection_lost(error_msg):
|
||
log_progress("[!] 浏览器连接断开")
|
||
return {"stopped": True}
|
||
log_progress(f"[X] 邮箱填写失败: {e}")
|
||
log_progress(f"当前URL: {page.url}")
|
||
return None
|
||
|
||
# ========== 步骤 2: 选择 SEPA ==========
|
||
step_cb("选择 SEPA 支付方式...")
|
||
log_progress("[2] 选择 SEPA...")
|
||
time.sleep(1)
|
||
sepa_clicked = False
|
||
|
||
# 定位方式(按速度排序:属性选择器 > CSS > xpath)
|
||
sepa_selectors = [
|
||
'@data-testid=sepa_debit-accordion-item-button', # 最快:属性选择器
|
||
'css:button[data-testid*="sepa"]', # 快:CSS 模糊匹配
|
||
]
|
||
for selector in sepa_selectors:
|
||
try:
|
||
sepa_btn = page.ele(selector, timeout=2)
|
||
if sepa_btn:
|
||
page.run_js('arguments[0].click()', sepa_btn)
|
||
sepa_clicked = True
|
||
time.sleep(1)
|
||
break
|
||
except:
|
||
continue
|
||
|
||
if not sepa_clicked:
|
||
# 最后尝试 JS
|
||
try:
|
||
result = page.run_js('''
|
||
const btns = document.querySelectorAll('button');
|
||
for(let btn of btns) {
|
||
if(btn.innerText.includes('SEPA')) {
|
||
btn.click();
|
||
return true;
|
||
}
|
||
}
|
||
return false;
|
||
''')
|
||
if result:
|
||
sepa_clicked = True
|
||
time.sleep(1)
|
||
except:
|
||
pass
|
||
|
||
if not sepa_clicked:
|
||
log_progress("[X] SEPA 选择失败")
|
||
return None
|
||
|
||
# 验证 SEPA 是否真正展开(检查 IBAN 输入框是否出现)
|
||
try:
|
||
iban_check = page.ele('#iban', timeout=5)
|
||
if not iban_check:
|
||
log_progress("[X] SEPA 未展开")
|
||
return None
|
||
except:
|
||
log_progress("[X] SEPA 未展开")
|
||
return None
|
||
|
||
# ========== 步骤 3: 填写 IBAN ==========
|
||
step_cb("填写 IBAN...")
|
||
log_progress("[3] 填写 IBAN...")
|
||
try:
|
||
iban_input = page.ele('#iban', timeout=5)
|
||
if not iban_input:
|
||
log_progress("[X] IBAN 输入框未找到")
|
||
return None
|
||
iban_input.input(sepa_iban)
|
||
except Exception as e:
|
||
log_progress(f"[X] IBAN 填写失败: {e}")
|
||
return None
|
||
|
||
# ========== 步骤 4: 填写账户姓名 ==========
|
||
step_cb("填写账户姓名...")
|
||
log_progress("[4] 填写姓名...")
|
||
try:
|
||
name_input = page.ele('#billingName', timeout=3)
|
||
if not name_input:
|
||
name_input = page.ele('@name=billingName', timeout=2)
|
||
if name_input:
|
||
name_input.input(account_name)
|
||
except:
|
||
pass
|
||
|
||
# ========== 步骤 5: 填写地址 ==========
|
||
step_cb("填写账单地址...")
|
||
log_progress("[5] 填写地址...")
|
||
try:
|
||
addr_input = page.ele('#billingAddressLine1', timeout=1)
|
||
if not addr_input:
|
||
try:
|
||
manual_btn = page.ele('@data-testid=manual-address-entry', timeout=1)
|
||
if manual_btn:
|
||
manual_btn.click()
|
||
time.sleep(0.3)
|
||
except:
|
||
pass
|
||
addr_input = page.ele('#billingAddressLine1', timeout=2)
|
||
|
||
if addr_input:
|
||
addr_input.input(street)
|
||
postal_input = page.ele('#billingPostalCode', timeout=1)
|
||
if postal_input:
|
||
postal_input.input(postal_code)
|
||
city_input = page.ele('#billingLocality', timeout=1)
|
||
if city_input:
|
||
city_input.input(city)
|
||
page.actions.key_down('Escape').key_up('Escape')
|
||
except:
|
||
pass
|
||
|
||
# ========== 步骤 6: 勾选条款 ==========
|
||
step_cb("勾选服务条款...")
|
||
log_progress("[6] 勾选条款...")
|
||
try:
|
||
terms_checkbox = page.ele('#termsOfServiceConsentCheckbox', timeout=3)
|
||
if terms_checkbox:
|
||
terms_checkbox.click()
|
||
except:
|
||
pass
|
||
|
||
# ========== 步骤 7: 点击订阅 ==========
|
||
step_cb("提交订阅...")
|
||
log_progress("[7] 点击订阅...")
|
||
time.sleep(1)
|
||
try:
|
||
subscribe_btn = page.ele('css:button[type="submit"]', timeout=5)
|
||
if subscribe_btn:
|
||
subscribe_btn.click()
|
||
except:
|
||
page.run_js('document.querySelector("button[type=submit]").click()')
|
||
|
||
# ========== 步骤 8: 等待支付成功 ==========
|
||
step_cb("等待支付处理...")
|
||
log_status("等待", "等待支付处理(超时90秒)...")
|
||
|
||
# 使用轮询方式等待支付成功,而不是 url_change
|
||
# 因为 url_change 在 URL 没有变化时会立即返回
|
||
payment_success = False
|
||
start_time = time.time()
|
||
max_wait = 90 # 最多等待 90 秒
|
||
|
||
while time.time() - start_time < max_wait:
|
||
current_url = page.url
|
||
|
||
# 检查是否支付成功
|
||
if 'payments/success' in current_url or 'success-team' in current_url:
|
||
payment_success = True
|
||
log_status("成功", "[OK] 支付成功!")
|
||
break
|
||
|
||
# 检查是否有错误页面
|
||
if 'error' in current_url.lower() or 'failed' in current_url.lower():
|
||
log_status("失败", f"[X] 支付失败: {current_url}")
|
||
return None
|
||
|
||
# 检查停止请求
|
||
if _is_shutdown_requested():
|
||
log_progress("[!] 检测到停止请求")
|
||
return {"stopped": True}
|
||
|
||
# 每 2 秒检查一次
|
||
elapsed = int(time.time() - start_time)
|
||
if elapsed % 10 == 0 and elapsed > 0:
|
||
log_progress(f"[等待中] 已等待 {elapsed} 秒...")
|
||
|
||
time.sleep(2)
|
||
|
||
if not payment_success:
|
||
log_status("超时", "[X] 支付未完成(超时)")
|
||
log_progress(f"最终URL: {page.url}")
|
||
return None
|
||
|
||
# ========== 步骤 9: 获取 token 和 account_id ==========
|
||
step_cb("获取 Token...")
|
||
log_status("获取", "正在获取 access token...")
|
||
time.sleep(1)
|
||
page.get("https://chatgpt.com/api/auth/session")
|
||
time.sleep(1)
|
||
|
||
try:
|
||
# 获取页面内容(JSON)
|
||
session_text = page.ele('tag:pre', timeout=5).text
|
||
import json
|
||
session_data = json.loads(session_text)
|
||
access_token = session_data.get('accessToken', '')
|
||
|
||
if access_token:
|
||
log_status("成功", f"Token: {access_token[:50]}...")
|
||
|
||
# 优先从 session 数据直接提取 account_id(最快)
|
||
step_cb("获取 Account ID...")
|
||
account_id = fetch_account_id_from_session(session_data)
|
||
|
||
# 如果 session 中没有,再通过 API 获取
|
||
if not account_id:
|
||
account_id = fetch_account_id(page, access_token)
|
||
|
||
if account_id:
|
||
log_progress(f"account_id: {account_id}")
|
||
|
||
return {
|
||
"token": access_token,
|
||
"account_id": account_id
|
||
}
|
||
else:
|
||
log_progress("[X] 未找到 accessToken")
|
||
return None
|
||
except Exception as e:
|
||
log_progress(f"[X] 获取 token 失败: {e}")
|
||
return None
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
# 只有连接断开才认为是停止请求,普通异常按错误处理
|
||
if _is_connection_lost(error_msg):
|
||
log_status("停止", "[!] 浏览器连接断开,支付流程已中断")
|
||
return {"stopped": True}
|
||
log_status("错误", f"[X] 支付流程异常: {e}")
|
||
return None
|
||
|
||
|
||
def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool = True, step_callback=None):
|
||
"""使用 API 会话的 cookies 注入浏览器完成支付 (协议模式专用)
|
||
|
||
Args:
|
||
reg: ChatGPTAPIRegister 对象
|
||
email: 邮箱
|
||
proxy: 代理地址
|
||
headless: 是否无头模式
|
||
step_callback: 步骤回调
|
||
|
||
Returns:
|
||
dict: {"token": ..., "account_id": ...} 或 None
|
||
"""
|
||
def step_cb(step):
|
||
if step_callback:
|
||
step_callback(step)
|
||
|
||
step_cb("获取支付页 URL...")
|
||
|
||
# 通过 API 获取支付页 URL
|
||
checkout_url = reg.get_checkout_url()
|
||
if not checkout_url:
|
||
log_status("失败", "无法获取支付页 URL")
|
||
return None
|
||
|
||
log_progress(f"[OK] 支付页: {checkout_url[:60]}...")
|
||
|
||
# 获取 cookies
|
||
cookies = reg.get_cookies()
|
||
log_status("Cookie", f"获取到 {len(cookies)} 个 cookies")
|
||
|
||
# 启动浏览器
|
||
step_cb("启动浏览器...")
|
||
temp_user_data = tempfile.mkdtemp(prefix="chrome_api_")
|
||
|
||
# 检测操作系统
|
||
is_linux = platform.system() == "Linux"
|
||
|
||
# 获取随机指纹
|
||
fingerprint = None
|
||
if RANDOM_FINGERPRINT:
|
||
fingerprint = get_random_fingerprint()
|
||
else:
|
||
fingerprint = {
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
}
|
||
|
||
co = ChromiumOptions()
|
||
co.set_argument('--no-first-run')
|
||
co.set_argument('--no-default-browser-check')
|
||
co.set_argument(f'--user-data-dir={temp_user_data}')
|
||
co.set_argument('--disable-blink-features=AutomationControlled')
|
||
co.set_argument('--disable-infobars')
|
||
co.set_argument('--disable-dev-shm-usage')
|
||
co.set_argument('--no-sandbox')
|
||
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
|
||
|
||
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
|
||
|
||
if headless:
|
||
co.set_argument('--headless=new')
|
||
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
else:
|
||
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
|
||
if proxy:
|
||
co.set_argument(f'--proxy-server={proxy}')
|
||
|
||
if is_linux:
|
||
co.set_argument('--disable-software-rasterizer')
|
||
co.set_argument('--disable-extensions')
|
||
co.set_argument('--disable-setuid-sandbox')
|
||
co.set_argument('--single-process')
|
||
co.set_argument('--remote-debugging-port=0')
|
||
chrome_paths = [
|
||
'/usr/bin/google-chrome',
|
||
'/usr/bin/google-chrome-stable',
|
||
'/usr/bin/chromium-browser',
|
||
'/usr/bin/chromium',
|
||
'/snap/bin/chromium',
|
||
]
|
||
for chrome_path in chrome_paths:
|
||
if os.path.exists(chrome_path):
|
||
co.set_browser_path(chrome_path)
|
||
break
|
||
else:
|
||
co.auto_port(True)
|
||
co.set_local_port(random.randint(19222, 29999))
|
||
|
||
log_status("浏览器", f"正在启动 ({'无头' if headless else '有头'}模式)...")
|
||
page = ChromiumPage(co)
|
||
|
||
try:
|
||
# 注入指纹
|
||
if RANDOM_FINGERPRINT:
|
||
inject_fingerprint(page, fingerprint)
|
||
|
||
# 先访问 chatgpt.com 注入 cookies
|
||
step_cb("注入登录状态...")
|
||
log_status("Cookie", "注入登录状态...")
|
||
page.get("https://chatgpt.com")
|
||
|
||
injected_count = 0
|
||
for cookie in cookies:
|
||
try:
|
||
if 'chatgpt.com' in cookie.get('domain', ''):
|
||
page.set.cookies({
|
||
'name': cookie['name'],
|
||
'value': cookie['value'],
|
||
'domain': cookie['domain'].lstrip('.'),
|
||
'path': cookie.get('path', '/'),
|
||
})
|
||
injected_count += 1
|
||
except:
|
||
pass
|
||
|
||
log_progress(f"[OK] 已注入 {injected_count} 个 cookies")
|
||
|
||
# 刷新页面确保 cookies 生效
|
||
time.sleep(1)
|
||
page.refresh()
|
||
time.sleep(1)
|
||
|
||
# 直接跳转到支付页
|
||
step_cb("跳转到支付页...")
|
||
log_status("订阅", "跳转到支付页...")
|
||
page.get(checkout_url)
|
||
|
||
try:
|
||
page.wait.url_change('pay.openai.com', timeout=15)
|
||
log_progress("[OK] 已跳转到支付页")
|
||
except:
|
||
time.sleep(2)
|
||
|
||
# 执行支付流程
|
||
step_cb("执行 SEPA 支付...")
|
||
result = run_payment_flow(page, email, step_cb)
|
||
return result
|
||
|
||
except Exception as e:
|
||
log_status("错误", f"浏览器流程异常: {e}")
|
||
return None
|
||
finally:
|
||
try:
|
||
page.quit()
|
||
except:
|
||
pass
|
||
# 清理临时目录
|
||
try:
|
||
import shutil
|
||
shutil.rmtree(temp_user_data, ignore_errors=True)
|
||
except:
|
||
pass
|
||
|
||
|
||
def fetch_account_id(page, access_token: str) -> str:
|
||
"""通过 API 获取 account_id (使用 requests 直接请求,更快)"""
|
||
log_status("获取", "正在获取 account_id...")
|
||
try:
|
||
# 直接使用 requests 请求 API,比 page.get() + JS fetch 快得多
|
||
headers = {
|
||
"Authorization": f"Bearer {access_token}",
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
|
||
}
|
||
resp = requests.get(
|
||
"https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27",
|
||
headers=headers,
|
||
timeout=10
|
||
)
|
||
|
||
if resp.status_code == 200:
|
||
data = resp.json()
|
||
accounts = data.get("accounts", {})
|
||
|
||
# 优先查找 Team 账户
|
||
for acc_id, acc_info in accounts.items():
|
||
if acc_id == "default":
|
||
continue
|
||
account_data = acc_info.get("account", {})
|
||
plan_type = account_data.get("plan_type", "")
|
||
if "team" in plan_type.lower():
|
||
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
|
||
return acc_id
|
||
|
||
# 取第一个非 default 的
|
||
for acc_id in accounts.keys():
|
||
if acc_id != "default":
|
||
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
|
||
return acc_id
|
||
else:
|
||
log_progress(f"API 请求失败: {resp.status_code}")
|
||
except Exception as e:
|
||
log_progress(f"获取 account_id 失败: {e}")
|
||
|
||
return ""
|
||
|
||
|
||
def fetch_account_id_from_session(session_data: dict) -> str:
|
||
"""直接从 session 数据中提取 account_id (最快方式)"""
|
||
try:
|
||
account = session_data.get("account", {})
|
||
account_id = account.get("id", "")
|
||
if account_id:
|
||
log_status("成功", f"获取到 account_id: {account_id[:8]}...")
|
||
return account_id
|
||
except Exception as e:
|
||
log_progress(f"从 session 提取 account_id 失败: {e}")
|
||
return ""
|
||
|
||
|
||
def run_main_process():
|
||
# 检查必要配置
|
||
if not MAIL_API_TOKEN or not MAIL_API_BASE or not EMAIL_DOMAINS:
|
||
print("\n" + "="*60)
|
||
print("[X] 配置错误: 请在 config.toml 中配置 [autogptplus] 段")
|
||
print(" - mail_api_token: Cloud Mail API Token")
|
||
print(" - mail_api_base: Cloud Mail API 地址")
|
||
print(" - email_domains: 可用邮箱域名列表")
|
||
print("="*60 + "\n")
|
||
return
|
||
|
||
# 清理可能残留的 Chrome 调试进程
|
||
cleanup_chrome_processes()
|
||
|
||
# === 1. 生成 15 位随机账号 + 同密码 ===
|
||
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
|
||
domains = get_email_domains()
|
||
if not domains:
|
||
print("\n" + "="*60)
|
||
print("[X] 配置错误: 没有可用的邮箱域名")
|
||
print(" 请在 config.toml 中配置 email_domains 或通过 Bot 添加")
|
||
print("="*60 + "\n")
|
||
return
|
||
email_domain = random.choice(domains)
|
||
email = f"{random_str}{email_domain}"
|
||
# 生成符合要求的密码:大小写字母+数字+特殊字符,至少12位
|
||
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
|
||
''.join(random.choices(string.ascii_lowercase, k=8)) + \
|
||
''.join(random.choices(string.digits, k=2)) + \
|
||
random.choice('!@#$%')
|
||
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
|
||
|
||
print("\n" + "="*60)
|
||
log_status("初始化", f"生成账号: {email}")
|
||
log_status("初始化", f"设置密码: {password}")
|
||
print("="*60 + "\n")
|
||
|
||
# 检测操作系统
|
||
is_linux = platform.system() == "Linux"
|
||
|
||
# 获取随机指纹
|
||
fingerprint = None
|
||
if RANDOM_FINGERPRINT:
|
||
fingerprint = get_random_fingerprint()
|
||
log_status("指纹", f"{fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}")
|
||
else:
|
||
fingerprint = {
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
}
|
||
log_status("指纹", "使用默认指纹 (RTX 3060)")
|
||
|
||
# 配置 DrissionPage - 与项目 browser_automation.py 保持一致
|
||
co = ChromiumOptions()
|
||
co.set_argument('--no-first-run') # 跳过首次运行
|
||
co.set_argument('--disable-infobars')
|
||
co.set_argument('--incognito') # 无痕模式
|
||
co.set_argument('--disable-gpu') # 减少资源占用
|
||
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
|
||
co.set_argument('--no-sandbox') # 服务器环境需要
|
||
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
|
||
co.set_argument('--lang=zh-CN') # 设置语言为中文简体
|
||
co.set_argument(f'--user-agent={fingerprint["user_agent"]}') # 设置 User-Agent
|
||
|
||
# Linux 服务器特殊配置
|
||
if is_linux:
|
||
co.set_argument('--disable-software-rasterizer')
|
||
co.set_argument('--disable-extensions')
|
||
co.set_argument('--disable-setuid-sandbox')
|
||
co.set_argument('--single-process') # 某些 Linux 环境需要
|
||
co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口
|
||
|
||
# 尝试查找 Chrome/Chromium 路径
|
||
chrome_paths = [
|
||
'/usr/bin/google-chrome',
|
||
'/usr/bin/google-chrome-stable',
|
||
'/usr/bin/chromium-browser',
|
||
'/usr/bin/chromium',
|
||
'/snap/bin/chromium',
|
||
]
|
||
for chrome_path in chrome_paths:
|
||
if os.path.exists(chrome_path):
|
||
co.set_browser_path(chrome_path)
|
||
log_status("浏览器", f"使用浏览器: {chrome_path}")
|
||
break
|
||
else:
|
||
co.auto_port(True) # Windows 使用自动分配端口
|
||
co.set_local_port(random.randint(19222, 29999)) # 备用:手动设置随机端口
|
||
|
||
# 无头模式 (服务器运行)
|
||
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
|
||
co.set_argument('--headless=new')
|
||
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
|
||
log_status("浏览器", f"正在启动浏览器 (无头模式, {'Linux' if is_linux else 'Windows'})...")
|
||
try:
|
||
page = ChromiumPage(co)
|
||
# 注入指纹伪装
|
||
if RANDOM_FINGERPRINT:
|
||
inject_fingerprint(page, fingerprint)
|
||
except Exception as e:
|
||
log_status("浏览器", f"首次启动失败: {e},尝试清理后重试...")
|
||
# 清理残留进程后重试
|
||
cleanup_chrome_processes()
|
||
time.sleep(2)
|
||
|
||
# 重新配置
|
||
co2 = ChromiumOptions()
|
||
co2.set_argument('--no-first-run')
|
||
co2.set_argument('--disable-infobars')
|
||
co2.set_argument('--incognito')
|
||
co2.set_argument('--disable-gpu')
|
||
co2.set_argument('--disable-dev-shm-usage')
|
||
co2.set_argument('--no-sandbox')
|
||
co2.set_argument('--disable-blink-features=AutomationControlled')
|
||
co2.set_argument('--lang=zh-CN')
|
||
co2.set_argument(f'--user-agent={fingerprint["user_agent"]}')
|
||
co2.set_argument('--headless=new')
|
||
co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
|
||
if is_linux:
|
||
co2.set_argument('--disable-software-rasterizer')
|
||
co2.set_argument('--disable-extensions')
|
||
co2.set_argument('--disable-setuid-sandbox')
|
||
co2.set_argument('--single-process')
|
||
co2.set_argument('--remote-debugging-port=0')
|
||
for chrome_path in chrome_paths:
|
||
if os.path.exists(chrome_path):
|
||
co2.set_browser_path(chrome_path)
|
||
break
|
||
else:
|
||
co2.set_local_port(random.randint(30000, 39999))
|
||
|
||
page = ChromiumPage(co2)
|
||
# 注入指纹伪装
|
||
if RANDOM_FINGERPRINT:
|
||
inject_fingerprint(page, fingerprint)
|
||
|
||
try:
|
||
# === 注册流程 ===
|
||
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
|
||
page.get(TARGET_URL)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# 使用 data-testid 定位登录按钮
|
||
login_btn = page.ele('@data-testid=login-button', timeout=30)
|
||
if not login_btn:
|
||
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
|
||
login_btn.click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
log_progress("填入邮箱...")
|
||
email_input = page.ele('@name=email', timeout=30)
|
||
email_input.input(email)
|
||
page.ele('xpath://button[@type="submit"]').click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
log_progress("填入密码...")
|
||
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
|
||
password_input.input(password)
|
||
page.ele('xpath://button[@type="submit"]').click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
log_status("步骤 2/5", "等待邮件 (All Mail)...")
|
||
|
||
# === 接码与验证 ===
|
||
verify_data = get_verification_content(email)
|
||
|
||
if verify_data:
|
||
log_status("步骤 3/5", "执行验证...")
|
||
if verify_data['type'] == 'link':
|
||
page.new_tab(verify_data['val'])
|
||
time.sleep(5)
|
||
log_progress(f"当前URL: {page.url}")
|
||
elif verify_data['type'] == 'code':
|
||
code = verify_data['val']
|
||
log_progress(f"填入验证码: {code}")
|
||
try:
|
||
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
|
||
code_input.input(code)
|
||
|
||
# 点击继续(使用 type=submit)
|
||
time.sleep(1)
|
||
try:
|
||
log_progress("尝试点击继续按钮...")
|
||
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
|
||
if continue_btn:
|
||
continue_btn.click()
|
||
except:
|
||
log_progress("未找到按钮或已自动跳转...")
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
except Exception as e:
|
||
log_progress(f"验证码填入异常: {e}")
|
||
|
||
# === 资料填写 (姓名+生日) ===
|
||
log_status("步骤 4/5", "进入信息填写页...")
|
||
log_progress(f"当前URL: {page.url}")
|
||
try:
|
||
name_input = page.ele('@name=name', timeout=20)
|
||
name_input.input(real_name)
|
||
log_progress(f"姓名: {real_name}")
|
||
|
||
# 生成随机生日
|
||
birth_year, birth_month, birth_day = generate_random_birthday()
|
||
|
||
# 优先使用精确输入,失败则使用备用方案
|
||
if not _input_birthday_precise(page, birth_year, birth_month, birth_day):
|
||
_input_birthday_fallback(page, birth_year, birth_month, birth_day)
|
||
|
||
time.sleep(1.5)
|
||
# 点击继续/完成注册(使用 type=submit)
|
||
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
|
||
final_reg_btn.click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# === 检测提交后是否有错误或需要额外操作 ===
|
||
submit_success = False
|
||
for check_attempt in range(10):
|
||
current_url = page.url
|
||
|
||
# 检查是否已跳转出 about-you 页面
|
||
if 'about-you' not in current_url:
|
||
log_progress(f"[OK] 页面已跳转: {current_url[:50]}...")
|
||
submit_success = True
|
||
break
|
||
|
||
# 检查是否有错误提示
|
||
try:
|
||
error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1)
|
||
if error_elem and error_elem.text:
|
||
log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}")
|
||
except:
|
||
pass
|
||
|
||
# 检查是否有未勾选的 checkbox(如服务条款)
|
||
try:
|
||
unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1)
|
||
if unchecked:
|
||
log_progress("发现未勾选的选项,尝试勾选...")
|
||
unchecked.click()
|
||
time.sleep(0.5)
|
||
# 重新点击提交
|
||
final_reg_btn = page.ele('css:button[type="submit"]', timeout=5)
|
||
if final_reg_btn:
|
||
final_reg_btn.click()
|
||
time.sleep(2)
|
||
except:
|
||
pass
|
||
|
||
# 检查是否有 CAPTCHA
|
||
try:
|
||
captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1)
|
||
if captcha:
|
||
log_progress("[!] 检测到 CAPTCHA,需要人工处理或等待...")
|
||
except:
|
||
pass
|
||
|
||
time.sleep(1)
|
||
|
||
if not submit_success:
|
||
log_progress(f"[!] 提交后仍在 about-you 页面,当前URL: {page.url}")
|
||
|
||
# =======================================================
|
||
# 【关键节点】等待进入主页后再执行 JS 跳转到支付页
|
||
# =======================================================
|
||
log_status("订阅", "等待进入 ChatGPT 主页...")
|
||
|
||
# 快速检测循环(减少等待时间)
|
||
entered_main = False
|
||
for i in range(20): # 最多等待 10 秒
|
||
current_url = page.url
|
||
# 更宽松的判断:只要不在 auth/login 页面就认为进入了主页
|
||
if 'chatgpt.com' in current_url:
|
||
if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url:
|
||
log_progress(f"[OK] 已进入主页: {current_url}")
|
||
entered_main = True
|
||
break
|
||
time.sleep(0.5) # 减少等待间隔
|
||
|
||
if not entered_main:
|
||
log_progress("[!] 等待主页超时,尝试继续...")
|
||
log_progress(f"当前URL: {page.url}")
|
||
# 尝试直接访问主页
|
||
page.get("https://chatgpt.com/")
|
||
time.sleep(2)
|
||
log_progress(f"跳转后URL: {page.url}")
|
||
|
||
log_status("订阅", "执行 JS 跳转到支付页...")
|
||
|
||
# 直接执行 JS 跳转到支付页(无需额外等待,JS 会自动获取 session)
|
||
checkout_js = '''
|
||
(async function(){
|
||
try {
|
||
const t = await(await fetch("/api/auth/session")).json();
|
||
if(!t.accessToken){
|
||
return "请先登录ChatGPT!";
|
||
}
|
||
const p = {
|
||
plan_name: "chatgptteamplan",
|
||
team_plan_data: {
|
||
workspace_name: "Sepa",
|
||
price_interval: "month",
|
||
seat_quantity: 5
|
||
},
|
||
billing_details: {
|
||
country: "DE",
|
||
currency: "EUR"
|
||
},
|
||
promo_campaign: {
|
||
promo_campaign_id: "team-1-month-free",
|
||
is_coupon_from_query_param: true
|
||
},
|
||
checkout_ui_mode: "redirect"
|
||
};
|
||
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
|
||
method: "POST",
|
||
headers: {
|
||
Authorization: "Bearer " + t.accessToken,
|
||
"Content-Type": "application/json"
|
||
},
|
||
body: JSON.stringify(p)
|
||
});
|
||
const d = await r.json();
|
||
if(d.url){
|
||
window.location.href = d.url;
|
||
return "success";
|
||
} else {
|
||
return "提取失败:" + (d.detail || JSON.stringify(d));
|
||
}
|
||
} catch(e) {
|
||
return "发生错误:" + e;
|
||
}
|
||
})();
|
||
'''
|
||
result = page.run_js(checkout_js)
|
||
log_progress(f"JS 执行结果: {result}")
|
||
|
||
# 等待跳转到支付页(使用 URL 检测代替固定等待)
|
||
try:
|
||
page.wait.url_change('pay.openai.com', timeout=15)
|
||
log_progress("[OK] 已跳转到支付页")
|
||
log_progress(f"支付页URL: {page.url}")
|
||
except:
|
||
time.sleep(1)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# 执行支付流程
|
||
result = run_payment_flow(page, email)
|
||
|
||
# 保存账号信息
|
||
if result and result.get("token"):
|
||
save_account(
|
||
email,
|
||
password,
|
||
result["token"],
|
||
result.get("account_id", "")
|
||
)
|
||
|
||
except Exception as e:
|
||
log_status("崩溃", f"注册/订阅转换阶段异常: {e}")
|
||
|
||
else:
|
||
log_status("失败", "未能找到验证码。")
|
||
|
||
except Exception as e:
|
||
log_status("崩溃", f"全局错误: {e}")
|
||
|
||
print("\n" + "="*60)
|
||
input("按回车键退出...")
|
||
page.quit()
|
||
|
||
|
||
def run_single_registration(progress_callback=None, step_callback=None) -> dict:
|
||
"""执行单次注册流程 (供 Bot 调用)
|
||
|
||
Args:
|
||
progress_callback: 进度回调函数 (message: str) - 用于日志
|
||
step_callback: 步骤回调函数 (step: str) - 用于更新 Bot 显示的当前步骤
|
||
|
||
Returns:
|
||
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
|
||
"""
|
||
def log_cb(msg):
|
||
if progress_callback:
|
||
progress_callback(msg)
|
||
else:
|
||
log_progress(msg)
|
||
|
||
def step_cb(step):
|
||
if step_callback:
|
||
step_callback(step)
|
||
|
||
# 检查必要配置
|
||
if not MAIL_API_TOKEN or not MAIL_API_BASE:
|
||
return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"}
|
||
|
||
# 检查域名
|
||
domains = get_email_domains()
|
||
if not domains:
|
||
return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"}
|
||
|
||
# 检查 IBAN
|
||
ibans = get_sepa_ibans()
|
||
if not ibans:
|
||
return {"success": False, "error": "没有可用的 IBAN,请先通过 /iban_add 导入"}
|
||
|
||
step_cb("生成账号信息...")
|
||
|
||
# 生成账号信息
|
||
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
|
||
email_domain = random.choice(domains)
|
||
email = f"{random_str}{email_domain}"
|
||
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
|
||
''.join(random.choices(string.ascii_lowercase, k=8)) + \
|
||
''.join(random.choices(string.digits, k=2)) + \
|
||
random.choice('!@#$%')
|
||
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
|
||
|
||
log_status("初始化", f"生成账号: {email}")
|
||
log_status("初始化", f"设置密码: {password}")
|
||
|
||
# 检测操作系统
|
||
is_linux = platform.system() == "Linux"
|
||
|
||
step_cb("启动浏览器...")
|
||
|
||
# 获取随机指纹
|
||
fingerprint = None
|
||
if RANDOM_FINGERPRINT:
|
||
fingerprint = get_random_fingerprint()
|
||
log_status("指纹", f"已注入: {fingerprint['webgl_renderer'][:40]}...")
|
||
else:
|
||
fingerprint = {
|
||
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
|
||
"platform": "Win32",
|
||
"webgl_vendor": "Google Inc. (NVIDIA)",
|
||
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
|
||
"screen": {"width": 1920, "height": 1080}
|
||
}
|
||
|
||
# 配置浏览器
|
||
co = ChromiumOptions()
|
||
co.set_argument('--no-first-run')
|
||
co.set_argument('--disable-infobars')
|
||
co.set_argument('--incognito')
|
||
co.set_argument('--disable-gpu')
|
||
co.set_argument('--disable-dev-shm-usage')
|
||
co.set_argument('--no-sandbox')
|
||
co.set_argument('--disable-blink-features=AutomationControlled')
|
||
co.set_argument('--lang=zh-CN')
|
||
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
|
||
|
||
if is_linux:
|
||
co.set_argument('--disable-software-rasterizer')
|
||
co.set_argument('--disable-extensions')
|
||
co.set_argument('--disable-setuid-sandbox')
|
||
co.set_argument('--single-process')
|
||
co.set_argument('--remote-debugging-port=0')
|
||
chrome_paths = [
|
||
'/usr/bin/google-chrome',
|
||
'/usr/bin/google-chrome-stable',
|
||
'/usr/bin/chromium-browser',
|
||
'/usr/bin/chromium',
|
||
'/snap/bin/chromium',
|
||
]
|
||
for chrome_path in chrome_paths:
|
||
if os.path.exists(chrome_path):
|
||
co.set_browser_path(chrome_path)
|
||
break
|
||
else:
|
||
co.auto_port(True)
|
||
co.set_local_port(random.randint(19222, 29999))
|
||
|
||
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
|
||
co.set_argument('--headless=new')
|
||
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
|
||
page = None
|
||
try:
|
||
page = ChromiumPage(co)
|
||
if RANDOM_FINGERPRINT:
|
||
inject_fingerprint(page, fingerprint)
|
||
except Exception as e:
|
||
log_status("浏览器", f"启动失败: {e},尝试清理后重试...")
|
||
cleanup_chrome_processes()
|
||
time.sleep(2)
|
||
|
||
# 重新配置
|
||
co2 = ChromiumOptions()
|
||
co2.set_argument('--no-first-run')
|
||
co2.set_argument('--disable-infobars')
|
||
co2.set_argument('--incognito')
|
||
co2.set_argument('--disable-gpu')
|
||
co2.set_argument('--disable-dev-shm-usage')
|
||
co2.set_argument('--no-sandbox')
|
||
co2.set_argument('--disable-blink-features=AutomationControlled')
|
||
co2.set_argument('--lang=zh-CN')
|
||
co2.set_argument(f'--user-agent={fingerprint["user_agent"]}')
|
||
co2.set_argument('--headless=new')
|
||
co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
|
||
|
||
if is_linux:
|
||
co2.set_argument('--disable-software-rasterizer')
|
||
co2.set_argument('--disable-extensions')
|
||
co2.set_argument('--disable-setuid-sandbox')
|
||
co2.set_argument('--single-process')
|
||
co2.set_argument('--remote-debugging-port=0')
|
||
for chrome_path in chrome_paths:
|
||
if os.path.exists(chrome_path):
|
||
co2.set_browser_path(chrome_path)
|
||
break
|
||
else:
|
||
co2.auto_port(True)
|
||
co2.set_local_port(random.randint(30000, 39999))
|
||
|
||
try:
|
||
page = ChromiumPage(co2)
|
||
if RANDOM_FINGERPRINT:
|
||
inject_fingerprint(page, fingerprint)
|
||
except Exception as e2:
|
||
return {"success": False, "error": f"浏览器启动失败: {e2}", "account": email, "password": password}
|
||
|
||
try:
|
||
step_cb("打开注册页面...")
|
||
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
|
||
page.get(TARGET_URL)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
step_cb("点击登录按钮...")
|
||
login_btn = page.ele('@data-testid=login-button', timeout=30)
|
||
if not login_btn:
|
||
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
|
||
login_btn.click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
step_cb("填写邮箱...")
|
||
log_progress("填入邮箱...")
|
||
email_input = page.ele('@name=email', timeout=30)
|
||
email_input.input(email)
|
||
page.ele('xpath://button[@type="submit"]').click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
step_cb("填写密码...")
|
||
log_progress("填入密码...")
|
||
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
|
||
password_input.input(password)
|
||
page.ele('xpath://button[@type="submit"]').click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
step_cb("等待验证邮件...")
|
||
log_status("步骤 2/5", "等待邮件 (All Mail)...")
|
||
verify_data = get_verification_content(email)
|
||
|
||
if not verify_data:
|
||
return {"success": False, "error": "未能获取验证码", "account": email, "password": password}
|
||
|
||
step_cb("执行邮箱验证...")
|
||
log_status("步骤 3/5", "执行验证...")
|
||
if verify_data['type'] == 'link':
|
||
page.new_tab(verify_data['val'])
|
||
time.sleep(5)
|
||
log_progress(f"当前URL: {page.url}")
|
||
elif verify_data['type'] == 'code':
|
||
code = verify_data['val']
|
||
log_progress(f"填入验证码: {code}")
|
||
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
|
||
code_input.input(code)
|
||
time.sleep(1)
|
||
try:
|
||
log_progress("尝试点击继续按钮...")
|
||
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
|
||
if continue_btn:
|
||
continue_btn.click()
|
||
except:
|
||
log_progress("未找到按钮或已自动跳转...")
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# 资料填写
|
||
step_cb("填写个人信息...")
|
||
log_status("步骤 4/5", "进入信息填写页...")
|
||
log_progress(f"当前URL: {page.url}")
|
||
name_input = page.ele('@name=name', timeout=20)
|
||
name_input.input(real_name)
|
||
log_progress(f"姓名: {real_name}")
|
||
|
||
# 生成随机生日
|
||
birth_year, birth_month, birth_day = generate_random_birthday()
|
||
|
||
# 优先使用精确输入,失败则使用备用方案
|
||
if not _input_birthday_precise(page, birth_year, birth_month, birth_day):
|
||
_input_birthday_fallback(page, birth_year, birth_month, birth_day)
|
||
|
||
time.sleep(1.5)
|
||
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
|
||
final_reg_btn.click()
|
||
time.sleep(2)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# === 检测提交后是否有错误或需要额外操作 ===
|
||
submit_success = False
|
||
for check_attempt in range(10):
|
||
current_url = page.url
|
||
|
||
# 检查是否已跳转出 about-you 页面
|
||
if 'about-you' not in current_url:
|
||
log_progress(f"[OK] 页面已跳转: {current_url[:50]}...")
|
||
submit_success = True
|
||
break
|
||
|
||
# 检查是否有错误提示
|
||
try:
|
||
error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1)
|
||
if error_elem and error_elem.text:
|
||
log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}")
|
||
except:
|
||
pass
|
||
|
||
# 检查是否有未勾选的 checkbox(如服务条款)
|
||
try:
|
||
unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1)
|
||
if unchecked:
|
||
log_progress("发现未勾选的选项,尝试勾选...")
|
||
unchecked.click()
|
||
time.sleep(0.5)
|
||
# 重新点击提交
|
||
final_reg_btn = page.ele('css:button[type="submit"]', timeout=5)
|
||
if final_reg_btn:
|
||
final_reg_btn.click()
|
||
time.sleep(2)
|
||
except:
|
||
pass
|
||
|
||
# 检查是否有 CAPTCHA
|
||
try:
|
||
captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1)
|
||
if captcha:
|
||
log_progress("[!] 检测到 CAPTCHA,需要人工处理或等待...")
|
||
except:
|
||
pass
|
||
|
||
time.sleep(1)
|
||
|
||
if not submit_success:
|
||
log_progress(f"[!] 提交后仍在 about-you 页面,当前URL: {page.url}")
|
||
|
||
# 等待进入主页 - 快速检测
|
||
step_cb("等待进入主页...")
|
||
log_status("订阅", "等待进入 ChatGPT 主页...")
|
||
|
||
# 快速检测循环(减少等待时间)
|
||
entered_main = False
|
||
for i in range(20): # 最多等待 10 秒
|
||
current_url = page.url
|
||
# 更宽松的判断:只要不在 auth/login 页面就认为进入了主页
|
||
if 'chatgpt.com' in current_url:
|
||
if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url:
|
||
log_progress(f"[OK] 已进入主页: {current_url}")
|
||
entered_main = True
|
||
break
|
||
time.sleep(0.5) # 减少等待间隔
|
||
|
||
if not entered_main:
|
||
log_progress("[!] 等待主页超时,尝试继续...")
|
||
log_progress(f"当前URL: {page.url}")
|
||
# 尝试直接访问主页
|
||
page.get("https://chatgpt.com/")
|
||
time.sleep(2)
|
||
log_progress(f"跳转后URL: {page.url}")
|
||
|
||
# 跳转到支付页
|
||
step_cb("跳转到支付页...")
|
||
log_status("订阅", "执行 JS 跳转到支付页...")
|
||
|
||
# 直接执行 JS 跳转到支付页(无需额外等待)
|
||
checkout_js = '''
|
||
(async function(){
|
||
try {
|
||
const t = await(await fetch("/api/auth/session")).json();
|
||
if(!t.accessToken){ return "请先登录ChatGPT!"; }
|
||
const p = {
|
||
plan_name: "chatgptteamplan",
|
||
team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 },
|
||
billing_details: { country: "DE", currency: "EUR" },
|
||
promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true },
|
||
checkout_ui_mode: "redirect"
|
||
};
|
||
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
|
||
method: "POST",
|
||
headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" },
|
||
body: JSON.stringify(p)
|
||
});
|
||
const d = await r.json();
|
||
if(d.url){ window.location.href = d.url; return "success"; }
|
||
else { return "提取失败:" + (d.detail || JSON.stringify(d)); }
|
||
} catch(e) { return "发生错误:" + e; }
|
||
})();
|
||
'''
|
||
result = page.run_js(checkout_js)
|
||
log_progress(f"JS 执行结果: {result}")
|
||
|
||
# 等待跳转到支付页
|
||
try:
|
||
page.wait.url_change('pay.openai.com', timeout=15)
|
||
log_progress("[OK] 已跳转到支付页")
|
||
log_progress(f"支付页URL: {page.url}")
|
||
except:
|
||
time.sleep(1)
|
||
log_progress(f"当前URL: {page.url}")
|
||
|
||
# 执行支付流程
|
||
step_cb("执行 SEPA 支付...")
|
||
result = run_payment_flow(page, email, step_cb)
|
||
|
||
if result and result.get("stopped"):
|
||
# 被 /stop 命令中断
|
||
log_status("停止", "[!] 注册被用户停止")
|
||
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
|
||
elif result and result.get("token"):
|
||
step_cb("注册成功!")
|
||
log_status("完成", "[OK] 注册成功!")
|
||
return {
|
||
"success": True,
|
||
"account": email,
|
||
"password": password,
|
||
"token": result["token"],
|
||
"account_id": result.get("account_id", "")
|
||
}
|
||
else:
|
||
log_status("失败", "注册失败: 支付流程失败")
|
||
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
# 只有连接断开才认为是停止请求
|
||
if _is_connection_lost(error_msg):
|
||
log_status("停止", "[!] 浏览器连接断开")
|
||
return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password}
|
||
log_status("错误", f"注册异常: {e}")
|
||
return {"success": False, "error": str(e), "account": email, "password": password}
|
||
finally:
|
||
if page:
|
||
try:
|
||
page.quit()
|
||
except:
|
||
pass
|
||
cleanup_chrome_processes()
|
||
|
||
|
||
def run_single_registration_api(progress_callback=None, step_callback=None, proxy: str = None) -> dict:
|
||
"""执行单次注册流程 - 协议模式 (API + Cookie 注入浏览器)
|
||
|
||
Args:
|
||
progress_callback: 进度回调函数 (message: str)
|
||
step_callback: 步骤回调函数 (step: str)
|
||
proxy: 代理地址
|
||
|
||
Returns:
|
||
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
|
||
"""
|
||
def log_cb(msg):
|
||
if progress_callback:
|
||
progress_callback(msg)
|
||
else:
|
||
log_progress(msg)
|
||
|
||
def step_cb(step):
|
||
if step_callback:
|
||
step_callback(step)
|
||
|
||
# 检查协议模式是否可用
|
||
if not API_MODE_AVAILABLE:
|
||
return {"success": False, "error": "协议模式不可用,请安装 curl_cffi: pip install curl_cffi"}
|
||
|
||
# 检查必要配置
|
||
if not MAIL_API_TOKEN or not MAIL_API_BASE:
|
||
return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"}
|
||
|
||
# 检查域名
|
||
domains = get_email_domains()
|
||
if not domains:
|
||
return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"}
|
||
|
||
# 检查 IBAN
|
||
ibans = get_sepa_ibans()
|
||
if not ibans:
|
||
return {"success": False, "error": "没有可用的 IBAN,请先通过 /iban_add 导入"}
|
||
|
||
step_cb("生成账号信息...")
|
||
|
||
# 生成账号信息
|
||
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
|
||
email_domain = random.choice(domains)
|
||
email = f"{random_str}{email_domain}"
|
||
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
|
||
''.join(random.choices(string.ascii_lowercase, k=8)) + \
|
||
''.join(random.choices(string.digits, k=2)) + \
|
||
random.choice('!@#$%')
|
||
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
|
||
|
||
# 生成生日
|
||
year = random.randint(2000, 2004)
|
||
month = random.randint(1, 12)
|
||
if month in [1, 3, 5, 7, 8, 10, 12]:
|
||
max_day = 31
|
||
elif month in [4, 6, 9, 11]:
|
||
max_day = 30
|
||
else:
|
||
max_day = 29 if year % 4 == 0 else 28
|
||
day = random.randint(1, max_day)
|
||
birthdate = f"{year}-{month:02d}-{day:02d}"
|
||
|
||
log_status("初始化", f"生成账号: {email}")
|
||
log_status("初始化", f"设置密码: {password}")
|
||
log_status("初始化", f"姓名: {real_name} | 生日: {birthdate}")
|
||
log_status("模式", "协议模式 (API + Cookie 注入)")
|
||
|
||
# 使用配置的代理或传入的代理
|
||
use_proxy = proxy or API_PROXY or None
|
||
if use_proxy:
|
||
log_status("代理", f"使用代理: {use_proxy}")
|
||
|
||
try:
|
||
# 阶段 1: API 注册
|
||
step_cb("API 快速注册...")
|
||
log_status("阶段 1", "========== API 快速注册 ==========")
|
||
|
||
reg = api_register_flow(
|
||
email=email,
|
||
password=password,
|
||
real_name=real_name,
|
||
birthdate=birthdate,
|
||
mail_api_base=MAIL_API_BASE,
|
||
mail_api_token=MAIL_API_TOKEN,
|
||
proxy=use_proxy,
|
||
progress_callback=log_cb
|
||
)
|
||
|
||
if not reg:
|
||
log_status("失败", "API 注册失败")
|
||
return {"success": False, "error": "API 注册失败", "account": email, "password": password}
|
||
|
||
log_status("完成", "[OK] API 注册成功!")
|
||
|
||
# 阶段 2: Cookie 注入浏览器 + 支付
|
||
step_cb("Cookie 注入浏览器...")
|
||
log_status("阶段 2", "========== Cookie 注入浏览器 + 订阅支付 ==========")
|
||
|
||
result = browser_pay_with_cookies(
|
||
reg=reg,
|
||
email=email,
|
||
proxy=use_proxy,
|
||
headless=True,
|
||
step_callback=step_cb
|
||
)
|
||
|
||
if result and result.get("stopped"):
|
||
log_status("停止", "[!] 注册被用户停止")
|
||
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
|
||
elif result and result.get("token"):
|
||
step_cb("注册成功!")
|
||
log_status("完成", "[OK] 全部流程完成!")
|
||
return {
|
||
"success": True,
|
||
"account": email,
|
||
"password": password,
|
||
"token": result["token"],
|
||
"account_id": result.get("account_id", "")
|
||
}
|
||
|
||
# 如果 Cookie 注入失败,尝试 API 登录方式
|
||
log_status("重试", "Cookie 注入失败,尝试 API 登录...")
|
||
step_cb("尝试 API 登录...")
|
||
|
||
reg2 = api_login_flow(
|
||
email=email,
|
||
password=password,
|
||
proxy=use_proxy,
|
||
progress_callback=log_cb
|
||
)
|
||
|
||
if reg2:
|
||
result = browser_pay_with_cookies(
|
||
reg=reg2,
|
||
email=email,
|
||
proxy=use_proxy,
|
||
headless=True,
|
||
step_callback=step_cb
|
||
)
|
||
|
||
if result and result.get("token"):
|
||
step_cb("注册成功!")
|
||
log_status("完成", "[OK] 全部流程完成!")
|
||
return {
|
||
"success": True,
|
||
"account": email,
|
||
"password": password,
|
||
"token": result["token"],
|
||
"account_id": result.get("account_id", "")
|
||
}
|
||
|
||
log_status("失败", "注册成功但支付/获取token失败")
|
||
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
|
||
|
||
except Exception as e:
|
||
error_msg = str(e)
|
||
# 只有连接断开才认为是停止请求
|
||
if _is_connection_lost(error_msg):
|
||
log_status("停止", "[!] 浏览器连接断开")
|
||
return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password}
|
||
log_status("错误", f"注册异常: {e}")
|
||
return {"success": False, "error": str(e), "account": email, "password": password}
|
||
finally:
|
||
cleanup_chrome_processes()
|
||
|
||
|
||
def run_single_registration_auto(progress_callback=None, step_callback=None, mode: str = None) -> dict:
|
||
"""自动选择模式执行注册
|
||
|
||
Args:
|
||
progress_callback: 进度回调
|
||
step_callback: 步骤回调
|
||
mode: 强制指定模式 ("browser" / "api"),None 则使用配置
|
||
|
||
Returns:
|
||
dict: 注册结果
|
||
"""
|
||
use_mode = mode or REGISTER_MODE
|
||
|
||
if use_mode == "api":
|
||
if not API_MODE_AVAILABLE:
|
||
log_status("警告", "协议模式不可用,回退到浏览器模式")
|
||
return run_single_registration(progress_callback, step_callback)
|
||
return run_single_registration_api(progress_callback, step_callback)
|
||
else:
|
||
return run_single_registration(progress_callback, step_callback)
|
||
|
||
|
||
def get_register_mode() -> str:
|
||
"""获取当前注册模式"""
|
||
return REGISTER_MODE
|
||
|
||
|
||
def set_register_mode(mode: str) -> bool:
|
||
"""设置注册模式 (运行时)
|
||
|
||
Args:
|
||
mode: "browser" 或 "api"
|
||
|
||
Returns:
|
||
bool: 是否设置成功
|
||
"""
|
||
global REGISTER_MODE
|
||
if mode in ("browser", "api"):
|
||
if mode == "api" and not API_MODE_AVAILABLE:
|
||
return False
|
||
REGISTER_MODE = mode
|
||
return True
|
||
return False
|
||
|
||
|
||
def is_api_mode_supported() -> bool:
|
||
"""检查协议模式是否支持"""
|
||
return API_MODE_AVAILABLE
|
||
|
||
|
||
if __name__ == "__main__":
|
||
run_main_process()
|