Files
codexTool/auto_gpt_team.py
2026-01-24 06:51:28 +08:00

1568 lines
60 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Author: muyyg
Project: Subscription Automation (DrissionPage Version)
Created: 2026-01-12
Version: 3.0-drission
"""
import time
import random
import string
import re
import sys
import os
import platform
import subprocess
import requests
from pathlib import Path
from DrissionPage import ChromiumPage, ChromiumOptions
# ================= 配置加载 =================
try:
import tomllib
except ImportError:
try:
import tomli as tomllib
except ImportError:
tomllib = None
BASE_DIR = Path(__file__).parent
CONFIG_FILE = BASE_DIR / "config.toml"
def _load_config():
"""从 config.toml 加载配置"""
if tomllib is None or not CONFIG_FILE.exists():
return {}
try:
with open(CONFIG_FILE, "rb") as f:
return tomllib.load(f)
except Exception:
return {}
_cfg = _load_config()
_autogptplus = _cfg.get("autogptplus", {})
# ================= 核心配置区域 =================
# 从 config.toml [autogptplus] 读取,如未配置则使用默认值
# 1. 管理员 Token
MAIL_API_TOKEN = _autogptplus.get("mail_api_token", "")
# 2. 你的域名后缀(随机选择)
EMAIL_DOMAINS = _autogptplus.get("email_domains", [])
# 3. Cloud-Mail 部署地址
MAIL_API_BASE = _autogptplus.get("mail_api_base", "")
# 4. 接口路径 (邮件查询)
MAIL_API_PATH = "/api/public/emailList"
# 5. SEPA IBAN 列表 (从配置文件读取)
SEPA_IBANS = _autogptplus.get("sepa_ibans", [])
# 6. 随机指纹开关
RANDOM_FINGERPRINT = _autogptplus.get("random_fingerprint", True)
# ================= 浏览器指纹 =================
FINGERPRINTS = [
# NVIDIA 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4080 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 3840, "height": 2160}
},
# AMD 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6800 XT Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 7900 XTX Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 3840, "height": 2160}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (AMD)",
"webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6700 XT Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
# Intel 显卡
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) Iris Xe Graphics Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (Intel)",
"webgl_renderer": "ANGLE (Intel, Intel(R) Arc A770 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1440}
},
# 笔记本配置
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3050 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
},
{
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 Laptop GPU Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 2560, "height": 1600}
},
]
def get_random_fingerprint() -> dict:
"""随机获取一个浏览器指纹"""
return random.choice(FINGERPRINTS)
def inject_fingerprint(page, fingerprint: dict):
"""注入浏览器指纹伪装脚本"""
try:
webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)")
webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)")
plat = fingerprint.get("platform", "Win32")
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
js_script = f'''
// 伪装 WebGL 指纹
const getParameterProxyHandler = {{
apply: function(target, thisArg, args) {{
const param = args[0];
if (param === 37445) {{ return "{webgl_vendor}"; }}
if (param === 37446) {{ return "{webgl_renderer}"; }}
return Reflect.apply(target, thisArg, args);
}}
}};
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler);
if (typeof WebGL2RenderingContext !== 'undefined') {{
const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter;
WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler);
}}
// 伪装 platform
Object.defineProperty(navigator, 'platform', {{ get: () => "{plat}" }});
// 伪装屏幕分辨率
Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]} }});
Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]} }});
Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]} }});
Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]} }});
// 隐藏 webdriver 特征
Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined }});
// 伪装 languages
Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"] }});
// 伪装 plugins
Object.defineProperty(navigator, 'plugins', {{
get: () => [
{{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }},
{{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }},
{{ name: "Native Client", filename: "internal-nacl-plugin" }}
]
}});
'''
page.run_js(js_script)
log_status("指纹", f"已注入: {webgl_renderer[:40]}...")
except Exception as e:
log_status("指纹", f"注入失败: {e}")
# ================= IBAN 管理函数 =================
IBAN_FILE = BASE_DIR / "sepa_ibans.txt"
def load_ibans_from_file():
"""从文件加载 IBAN 列表"""
if not IBAN_FILE.exists():
return []
try:
with open(IBAN_FILE, "r", encoding="utf-8") as f:
ibans = [line.strip() for line in f if line.strip() and line.strip().startswith("DE")]
return ibans
except Exception:
return []
def save_ibans_to_file(ibans: list):
"""保存 IBAN 列表到文件"""
try:
with open(IBAN_FILE, "w", encoding="utf-8") as f:
f.write("\n".join(ibans))
return True
except Exception:
return False
def get_sepa_ibans():
"""获取 SEPA IBAN 列表 (优先从文件读取)"""
file_ibans = load_ibans_from_file()
if file_ibans:
return file_ibans
return SEPA_IBANS
def add_sepa_ibans(new_ibans: list) -> tuple:
"""添加 IBAN 到列表
Args:
new_ibans: 新的 IBAN 列表
Returns:
tuple: (添加数量, 跳过数量, 当前总数)
"""
current = set(load_ibans_from_file())
added = 0
skipped = 0
for iban in new_ibans:
iban = iban.strip().upper()
if not iban or not iban.startswith("DE"):
continue
if iban in current:
skipped += 1
else:
current.add(iban)
added += 1
save_ibans_to_file(sorted(current))
return added, skipped, len(current)
def clear_sepa_ibans():
"""清空 IBAN 列表"""
if IBAN_FILE.exists():
IBAN_FILE.unlink()
return True
# ================= 域名管理函数 =================
DOMAIN_FILE = BASE_DIR / "email_domains.txt"
def load_domains_from_file():
"""从文件加载域名列表"""
if not DOMAIN_FILE.exists():
return []
try:
with open(DOMAIN_FILE, "r", encoding="utf-8") as f:
domains = [line.strip() for line in f if line.strip() and line.strip().startswith("@")]
return domains
except Exception:
return []
def save_domains_to_file(domains: list):
"""保存域名列表到文件"""
try:
with open(DOMAIN_FILE, "w", encoding="utf-8") as f:
f.write("\n".join(domains))
return True
except Exception:
return False
def get_email_domains():
"""获取邮箱域名列表 (合并文件和配置)"""
file_domains = set(load_domains_from_file())
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
# 合并两个来源的域名
all_domains = file_domains | config_domains
return sorted(all_domains) if all_domains else []
def add_email_domains(new_domains: list) -> tuple:
"""添加域名到列表
Args:
new_domains: 新的域名列表
Returns:
tuple: (添加数量, 跳过数量, 当前总数)
"""
# 获取当前所有域名(文件 + 配置)
current = set(load_domains_from_file())
config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set()
all_existing = current | config_domains
added = 0
skipped = 0
for domain in new_domains:
domain = domain.strip().lower()
# 确保以 @ 开头
if not domain.startswith("@"):
domain = "@" + domain
if not domain or len(domain) < 4: # 至少 @x.y
continue
if domain in all_existing:
skipped += 1
else:
current.add(domain)
all_existing.add(domain)
added += 1
# 只保存通过 Bot 添加的域名到文件
save_domains_to_file(sorted(current))
return added, skipped, len(all_existing)
def remove_email_domain(domain: str) -> bool:
"""删除指定域名 (只能删除通过 Bot 添加的域名)
Args:
domain: 要删除的域名
Returns:
bool: 是否删除成功
"""
current = set(load_domains_from_file())
domain = domain.strip().lower()
if not domain.startswith("@"):
domain = "@" + domain
if domain in current:
current.remove(domain)
save_domains_to_file(sorted(current))
return True
return False
def clear_email_domains():
"""清空域名列表"""
if DOMAIN_FILE.exists():
DOMAIN_FILE.unlink()
return True
# ================= 固定配置 =================
TARGET_URL = "https://chatgpt.com"
def generate_random_birthday():
"""生成随机生日 (2000-2004年)"""
year = random.randint(2000, 2004)
month = random.randint(1, 12)
# 根据月份确定天数
if month in [1, 3, 5, 7, 8, 10, 12]:
max_day = 31
elif month in [4, 6, 9, 11]:
max_day = 30
else: # 2月
max_day = 29 if year % 4 == 0 else 28
day = random.randint(1, max_day)
return str(year), f"{month:02d}", f"{day:02d}"
# 地址格式: (街道, 邮编, 城市)
SEPA_ADDRESSES = [
# 柏林
("Alexanderplatz 1", "10178", "Berlin"),
("Unter den Linden 77", "10117", "Berlin"),
("Kurfürstendamm 21", "10719", "Berlin"),
("Friedrichstraße 43", "10117", "Berlin"),
("Potsdamer Platz 1", "10785", "Berlin"),
("Tauentzienstraße 9", "10789", "Berlin"),
# 慕尼黑
("Marienplatz 8", "80331", "München"),
("Leopoldstraße 32", "80802", "München"),
("Maximilianstraße 17", "80539", "München"),
("Kaufingerstraße 28", "80331", "München"),
("Sendlinger Straße 3", "80331", "München"),
# 汉堡
("Mönckebergstraße 16", "20095", "Hamburg"),
("Jungfernstieg 38", "20354", "Hamburg"),
("Spitalerstraße 12", "20095", "Hamburg"),
("Neuer Wall 50", "20354", "Hamburg"),
# 法兰克福
("Zeil 106", "60313", "Frankfurt am Main"),
("Kaiserstraße 62", "60329", "Frankfurt am Main"),
("Goethestraße 1", "60313", "Frankfurt am Main"),
("Große Bockenheimer Str. 2", "60313", "Frankfurt am Main"),
# 科隆
("Hohe Straße 111", "50667", "Köln"),
("Schildergasse 24", "50667", "Köln"),
("Breite Straße 80", "50667", "Köln"),
# 斯图加特
("Königstraße 2", "70173", "Stuttgart"),
("Calwer Straße 19", "70173", "Stuttgart"),
("Schulstraße 5", "70173", "Stuttgart"),
# 杜塞尔多夫
("Königsallee 60", "40212", "Düsseldorf"),
("Schadowstraße 11", "40212", "Düsseldorf"),
("Flinger Straße 36", "40213", "Düsseldorf"),
# 莱比锡
("Grimmaische Straße 25", "04109", "Leipzig"),
("Petersstraße 36", "04109", "Leipzig"),
# 德累斯顿
("Prager Straße 12", "01069", "Dresden"),
("Altmarkt 10", "01067", "Dresden"),
# 纽伦堡
("Karolinenstraße 12", "90402", "Nürnberg"),
("Breite Gasse 23", "90402", "Nürnberg"),
# 汉诺威
("Georgstraße 10", "30159", "Hannover"),
("Bahnhofstraße 5", "30159", "Hannover"),
# 不来梅
("Obernstraße 2", "28195", "Bremen"),
("Sögestraße 18", "28195", "Bremen"),
]
FIRST_NAMES = [
# 德国常见男性名
"Lukas", "Leon", "Maximilian", "Felix", "Paul", "Jonas", "Tim", "David",
"Niklas", "Jan", "Philipp", "Moritz", "Alexander", "Sebastian", "Florian",
"Julian", "Tobias", "Simon", "Daniel", "Christian", "Markus", "Thomas",
"Michael", "Stefan", "Andreas", "Martin", "Matthias", "Benjamin", "Patrick",
# 德国常见女性名
"Anna", "Laura", "Julia", "Lena", "Sarah", "Lisa", "Marie", "Sophie",
"Katharina", "Hannah", "Emma", "Mia", "Lea", "Johanna", "Clara",
"Charlotte", "Emilia", "Luisa", "Nina", "Elena", "Melanie", "Christina",
"Sandra", "Nicole", "Sabine", "Claudia", "Petra", "Monika", "Stefanie",
]
LAST_NAMES = [
# 德国最常见姓氏
"Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner",
"Becker", "Schulz", "Hoffmann", "Schäfer", "Koch", "Bauer", "Richter",
"Klein", "Wolf", "Schröder", "Neumann", "Schwarz", "Zimmermann", "Braun",
"Krüger", "Hofmann", "Hartmann", "Lange", "Schmitt", "Werner", "Schmitz",
"Krause", "Meier", "Lehmann", "Schmid", "Schulze", "Maier", "Köhler",
"Herrmann", "König", "Walter", "Mayer", "Huber", "Kaiser", "Fuchs",
"Peters", "Lang", "Scholz", "Möller", "Weiß", "Jung", "Hahn", "Schubert",
]
# ================= 工具函数 =================
def cleanup_chrome_processes():
"""清理残留的 Chrome 进程 (跨平台支持)"""
try:
if platform.system() == "Windows":
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
try:
result = subprocess.run(
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
pid = line.strip()
if pid.isdigit():
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
except Exception:
pass
log_status("清理", "已清理 Chrome 残留进程")
else:
# Linux/Mac: 使用 pkill
try:
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程
try:
subprocess.run(
['pkill', '-f', 'chrome.*--headless'],
capture_output=True, timeout=5
)
except Exception:
pass
log_status("清理", "已清理 Chrome 残留进程")
except Exception:
pass # 静默处理,不影响主流程
def log_status(step, message):
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] [{step}] {message}")
sys.stdout.flush()
def log_progress(message):
print(f" -> {message}")
sys.stdout.flush()
def save_account(email, password, token, account_id=""):
"""保存账号信息到 JSON 文件"""
import json
accounts_file = "accounts.json"
# 读取现有账号
accounts = []
try:
with open(accounts_file, 'r', encoding='utf-8') as f:
accounts = json.load(f)
except:
pass
# 添加新账号
account_data = {
"account": email,
"password": password,
"token": token
}
if account_id:
account_data["account_id"] = account_id
accounts.append(account_data)
# 保存
with open(accounts_file, 'w', encoding='utf-8') as f:
json.dump(accounts, f, ensure_ascii=False, indent=2)
log_status("保存", f"账号已保存到 {accounts_file}")
def get_verification_content(target_email, max_retries=90):
log_status("API监听", f"正在监听邮件 ({MAIL_API_PATH})...")
headers = {
"Authorization": MAIL_API_TOKEN,
"Content-Type": "application/json"
}
start_time = time.time()
for i in range(max_retries):
elapsed = int(time.time() - start_time)
try:
url = f"{MAIL_API_BASE}{MAIL_API_PATH}"
payload = {
"toEmail": target_email,
"timeSort": "desc",
"size": 20
}
resp = requests.post(url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 200:
mails = data.get('data', [])
if mails:
for mail in mails:
log_status("捕获", "✅ 成功抓取到目标邮件!")
html_body = mail.get('content') or mail.get('text') or str(mail)
# 提取验证码
code_match = re.search(r'\b(\d{6})\b', html_body)
if code_match:
code = code_match.group(1)
log_status("解析", f"提取到验证码: {code}")
return {"type": "code", "val": code}
# 提取链接
link_match = re.search(r'href="(https://.*openai\.com/.*verification.*)"', html_body)
if not link_match:
link_match = re.search(r'href="(https://auth0\.openai\.com/u/login/identifier\?state=[^"]+)"', html_body)
if link_match:
link = link_match.group(1)
log_status("解析", f"提取到链接: {link[:30]}...")
return {"type": "link", "val": link}
except:
pass
if i % 5 == 0:
print(f" [监听中] 已耗时 {elapsed}秒...")
sys.stdout.flush()
time.sleep(2)
log_status("超时", "❌ 未能获取验证码。")
return None
def run_payment_flow(page, email, step_callback=None):
"""执行 SEPA 支付流程 - 严格按顺序执行,任一步骤失败则终止
Args:
page: 浏览器页面对象
email: 邮箱地址
step_callback: 步骤回调函数 (step: str)
"""
def step_cb(step):
if step_callback:
step_callback(step)
log_status("支付流程", "开始处理 Stripe 支付页...")
try:
# 等待支付页加载完成
step_cb("等待支付页加载...")
log_status("支付页", "等待支付页加载...")
# 等待页面稳定Stripe 支付页需要时间渲染)
time.sleep(3)
# 尝试多种方式定位邮箱输入框
email_input = None
for attempt in range(3):
# 方式1: 直接 ID
email_input = page.ele('#email', timeout=5)
if email_input:
break
# 方式2: name 属性
email_input = page.ele('@name=email', timeout=3)
if email_input:
break
# 方式3: CSS 选择器
email_input = page.ele('css:input[type="email"], input[id*="email"], input[name*="email"]', timeout=3)
if email_input:
break
log_progress(f"[尝试{attempt+1}] 等待邮箱输入框...")
time.sleep(2)
if email_input:
log_progress("✓ 支付页已加载")
else:
log_progress("⚠ 邮箱输入框未立即出现,继续尝试...")
time.sleep(3)
# 随机选择 IBAN 和地址
ibans = get_sepa_ibans()
if not ibans:
log_progress("❌ 没有可用的 IBAN请先通过 Bot 导入")
return None
sepa_iban = random.choice(ibans)
street, postal_code, city = random.choice(SEPA_ADDRESSES)
account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
log_progress(f"使用 IBAN: {sepa_iban[:8]}...")
log_progress(f"使用地址: {street}, {postal_code} {city}")
log_progress(f"账户名: {account_name}")
# ========== 步骤 1: 填写邮箱 ==========
step_cb("填写支付邮箱...")
log_progress("[步骤1] 填写邮箱...")
try:
# 如果之前已经找到了,直接使用;否则重新查找
if not email_input:
email_input = page.ele('#email', timeout=10)
if not email_input:
email_input = page.ele('@name=email', timeout=5)
if not email_input:
email_input = page.ele('css:input[type="email"]', timeout=5)
if not email_input:
log_progress("❌ 邮箱输入框未找到")
log_progress(f"当前URL: {page.url}")
return None
email_input.clear()
email_input.input(email)
log_progress(f"✓ 已填写邮箱: {email}")
time.sleep(1)
except Exception as e:
log_progress(f"❌ 邮箱填写失败: {e}")
log_progress(f"当前URL: {page.url}")
return None
# ========== 步骤 2: 选择 SEPA ==========
step_cb("选择 SEPA 支付方式...")
log_progress("[步骤2] 选择 SEPA 直接借记...")
time.sleep(2)
sepa_clicked = False
# 定位方式(按速度排序:属性选择器 > CSS > xpath
sepa_selectors = [
'@data-testid=sepa_debit-accordion-item-button', # 最快:属性选择器
'css:button[data-testid*="sepa"]', # 快CSS 模糊匹配
'xpath://button[contains(., "SEPA")]', # 备用xpath 文本匹配
]
for selector in sepa_selectors:
try:
sepa_btn = page.ele(selector, timeout=2)
if sepa_btn:
page.run_js('arguments[0].click()', sepa_btn)
sepa_clicked = True
log_progress("✓ 已点击 SEPA 按钮")
time.sleep(2)
break
except:
continue
if not sepa_clicked:
# 最后尝试 JS
try:
result = page.run_js('''
const btns = document.querySelectorAll('button');
for(let btn of btns) {
if(btn.innerText.includes('SEPA')) {
btn.click();
return true;
}
}
return false;
''')
if result:
sepa_clicked = True
log_progress("✓ 已点击 SEPA (JS)")
time.sleep(2)
except:
pass
if not sepa_clicked:
log_progress("❌ SEPA 选择失败")
return None
# 验证 SEPA 是否真正展开(检查 IBAN 输入框是否出现)
log_progress("验证 SEPA 是否展开...")
time.sleep(2)
try:
iban_check = page.ele('#iban', timeout=5)
if not iban_check:
log_progress("❌ SEPA 未展开IBAN 输入框未出现")
return None
log_progress("✓ SEPA 已展开IBAN 输入框已出现")
except:
log_progress("❌ SEPA 未展开IBAN 输入框未出现")
return None
# ========== 步骤 3: 填写 IBAN ==========
step_cb("填写 IBAN...")
log_progress("[步骤3] 填写 IBAN...")
try:
# 优先使用 #iban (Stripe 标准 id),更快
iban_input = page.ele('#iban', timeout=5)
if not iban_input:
iban_input = page.ele('@name=iban', timeout=3)
if not iban_input:
log_progress("❌ IBAN 输入框未找到")
return None
iban_input.input(sepa_iban)
log_progress(f"✓ 已填写 IBAN: {sepa_iban}")
time.sleep(1)
except Exception as e:
log_progress(f"❌ IBAN 填写失败: {e}")
return None
# ========== 步骤 4: 填写账户姓名 ==========
step_cb("填写账户姓名...")
log_progress("[步骤4] 填写账户姓名...")
try:
# 优先使用 billingName (Stripe 支付页面标准 id)
name_input = page.ele('#billingName', timeout=5)
if not name_input:
name_input = page.ele('@name=billingName', timeout=3)
if not name_input:
name_input = page.ele('xpath://input[@name="name" or contains(@id, "name") or contains(@placeholder, "姓名")]', timeout=3)
if not name_input:
log_progress("❌ 姓名输入框未找到")
return None
name_input.input(account_name)
log_progress(f"✓ 已填写账户姓名: {account_name}")
time.sleep(1)
except Exception as e:
log_progress(f"❌ 账户姓名填写失败: {e}")
return None
# ========== 步骤 5: 填写地址 ==========
step_cb("填写账单地址...")
log_progress("[步骤5] 填写地址...")
try:
# 检查是否需要点击"手动输入地址"(仅当地址输入框不存在时)
addr_input = page.ele('#billingAddressLine1', timeout=1)
if not addr_input:
# 尝试点击手动输入地址按钮
try:
manual_btn = page.ele('@data-testid=manual-address-entry', timeout=1)
if manual_btn:
manual_btn.click()
time.sleep(0.5)
except:
pass
addr_input = page.ele('#billingAddressLine1', timeout=3)
if not addr_input:
log_progress("❌ 地址输入框未找到")
return None
# 一次性填写所有地址字段
addr_input.input(street)
log_progress(f"✓ 已填写地址: {street}")
postal_input = page.ele('#billingPostalCode', timeout=1)
if postal_input:
postal_input.input(postal_code)
log_progress(f"✓ 已填写邮编: {postal_code}")
city_input = page.ele('#billingLocality', timeout=1)
if city_input:
city_input.input(city)
log_progress(f"✓ 已填写城市: {city}")
# 关闭 Google 地址建议弹窗
page.actions.key_down('Escape').key_up('Escape')
time.sleep(0.3)
page.run_js('document.body.click()')
except Exception as e:
log_progress(f"❌ 地址填写失败: {e}")
return None
# ========== 步骤 6: 勾选条款 ==========
step_cb("勾选服务条款...")
log_progress("[步骤6] 勾选条款...")
try:
terms_checkbox = page.ele('#termsOfServiceConsentCheckbox', timeout=5)
if terms_checkbox:
terms_checkbox.click()
log_progress("✓ 已勾选条款")
time.sleep(1)
except Exception as e:
log_progress(f"⚠ 条款勾选失败(可能已勾选): {e}")
# ========== 步骤 7: 点击订阅 ==========
step_cb("提交订阅...")
log_progress("[步骤7] 点击订阅按钮...")
time.sleep(2)
subscribe_processing = False
# 尝试点击订阅按钮并验证是否进入处理状态
for attempt in range(3):
try:
subscribe_btn = page.ele('css:button[type="submit"]', timeout=5)
if subscribe_btn:
subscribe_btn.click()
log_progress(f"[尝试{attempt+1}] 已点击订阅按钮,等待处理状态...")
# 等待按钮变成"正在处理"状态(检测 disabled 属性或 spinner
for _ in range(10):
time.sleep(0.5)
try:
# 检查按钮是否被禁用(处理中)
btn_disabled = page.run_js('''
const btn = document.querySelector('button[type="submit"]');
if (!btn) return false;
return btn.disabled || btn.classList.contains('processing') ||
btn.querySelector('.spinner, .loading, svg') !== null;
''')
if btn_disabled:
subscribe_processing = True
log_progress("✓ 订阅按钮已进入处理状态")
break
# 检查 URL 是否已经变化(支付成功)
if 'success' in page.url:
subscribe_processing = True
log_progress("✓ 已检测到支付成功")
break
except:
pass
if subscribe_processing:
break
except:
pass
if not subscribe_processing:
# JS 备用点击
try:
page.run_js('document.querySelector("button[type=submit]").click()')
time.sleep(2)
except:
pass
if not subscribe_processing:
log_progress("⚠ 未检测到处理状态,继续等待支付结果...")
# ========== 步骤 8: 等待支付成功 ==========
step_cb("等待支付处理...")
log_status("等待", "等待支付处理超时60秒...")
try:
page.wait.url_change('payments/success-team', timeout=60)
log_status("成功", "✓ 支付成功!")
except:
log_status("超时", "❌ 支付未在60秒内完成")
return None
# ========== 步骤 9: 获取 token 和 account_id ==========
step_cb("获取 Token...")
log_status("获取", "正在获取 access token...")
time.sleep(2)
page.get("https://chatgpt.com/api/auth/session")
time.sleep(2)
try:
# 获取页面内容JSON
session_text = page.ele('tag:pre', timeout=5).text
import json
session_data = json.loads(session_text)
access_token = session_data.get('accessToken', '')
if access_token:
log_status("成功", f"获取到 token: {access_token[:50]}...")
# 获取 account_id
step_cb("获取 Account ID...")
account_id = fetch_account_id(page, access_token)
return {
"token": access_token,
"account_id": account_id
}
else:
log_progress("未找到 accessToken")
return None
except Exception as e:
log_progress(f"获取 token 失败: {e}")
return None
except Exception as e:
log_status("错误", f"[X] 支付流程异常: {e}")
return None
def fetch_account_id(page, access_token: str) -> str:
"""通过 API 获取 account_id"""
log_status("获取", "正在获取 account_id...")
try:
page.get("https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27")
time.sleep(2)
# 使用 JS 请求 API
result = page.run_js(f'''
return fetch("https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27", {{
headers: {{
"Authorization": "Bearer {access_token}",
"Content-Type": "application/json"
}}
}})
.then(r => r.json())
.then(data => JSON.stringify(data))
.catch(e => "error:" + e);
''')
if result and not result.startswith("error:"):
import json
data = json.loads(result)
accounts = data.get("accounts", {})
# 优先查找 Team 账户
for acc_id, acc_info in accounts.items():
if acc_id == "default":
continue
account_data = acc_info.get("account", {})
plan_type = account_data.get("plan_type", "")
if "team" in plan_type.lower():
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
return acc_id
# 取第一个非 default 的
for acc_id in accounts.keys():
if acc_id != "default":
log_status("成功", f"获取到 account_id: {acc_id[:8]}...")
return acc_id
except Exception as e:
log_progress(f"获取 account_id 失败: {e}")
return ""
def run_main_process():
# 检查必要配置
if not MAIL_API_TOKEN or not MAIL_API_BASE or not EMAIL_DOMAINS:
print("\n" + "="*60)
print("❌ 配置错误: 请在 config.toml 中配置 [autogptplus] 段")
print(" - mail_api_token: Cloud Mail API Token")
print(" - mail_api_base: Cloud Mail API 地址")
print(" - email_domains: 可用邮箱域名列表")
print("="*60 + "\n")
return
# 清理可能残留的 Chrome 调试进程
cleanup_chrome_processes()
# === 1. 生成 15 位随机账号 + 同密码 ===
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
domains = get_email_domains()
if not domains:
print("\n" + "="*60)
print("❌ 配置错误: 没有可用的邮箱域名")
print(" 请在 config.toml 中配置 email_domains 或通过 Bot 添加")
print("="*60 + "\n")
return
email_domain = random.choice(domains)
email = f"{random_str}{email_domain}"
# 生成符合要求的密码:大小写字母+数字+特殊字符至少12位
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
''.join(random.choices(string.ascii_lowercase, k=8)) + \
''.join(random.choices(string.digits, k=2)) + \
random.choice('!@#$%')
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
print("\n" + "="*60)
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
print("="*60 + "\n")
# 检测操作系统
is_linux = platform.system() == "Linux"
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log_status("指纹", f"{fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}")
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
log_status("指纹", "使用默认指纹 (RTX 3060)")
# 配置 DrissionPage - 与项目 browser_automation.py 保持一致
co = ChromiumOptions()
co.set_argument('--no-first-run') # 跳过首次运行
co.set_argument('--disable-infobars')
co.set_argument('--incognito') # 无痕模式
co.set_argument('--disable-gpu') # 减少资源占用
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
co.set_argument('--no-sandbox') # 服务器环境需要
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
co.set_argument('--lang=zh-CN') # 设置语言为中文简体
co.set_argument(f'--user-agent={fingerprint["user_agent"]}') # 设置 User-Agent
# Linux 服务器特殊配置
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
co.set_argument('--single-process') # 某些 Linux 环境需要
co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口
# 尝试查找 Chrome/Chromium 路径
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log_status("浏览器", f"使用浏览器: {chrome_path}")
break
else:
co.auto_port(True) # Windows 使用自动分配端口
co.set_local_port(random.randint(19222, 29999)) # 备用:手动设置随机端口
# 无头模式 (服务器运行)
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
log_status("浏览器", f"正在启动浏览器 (无头模式, {'Linux' if is_linux else 'Windows'})...")
try:
page = ChromiumPage(co)
# 注入指纹伪装
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
except Exception as e:
log_status("浏览器", f"首次启动失败: {e},尝试清理后重试...")
# 清理残留进程后重试
cleanup_chrome_processes()
time.sleep(2)
# 重新配置
co2 = ChromiumOptions()
co2.set_argument('--no-first-run')
co2.set_argument('--disable-infobars')
co2.set_argument('--incognito')
co2.set_argument('--disable-gpu')
co2.set_argument('--disable-dev-shm-usage')
co2.set_argument('--no-sandbox')
co2.set_argument('--disable-blink-features=AutomationControlled')
co2.set_argument('--lang=zh-CN')
co2.set_argument(f'--user-agent={fingerprint["user_agent"]}')
co2.set_argument('--headless=new')
co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
if is_linux:
co2.set_argument('--disable-software-rasterizer')
co2.set_argument('--disable-extensions')
co2.set_argument('--disable-setuid-sandbox')
co2.set_argument('--single-process')
co2.set_argument('--remote-debugging-port=0')
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co2.set_browser_path(chrome_path)
break
else:
co2.set_local_port(random.randint(30000, 39999))
page = ChromiumPage(co2)
# 注入指纹伪装
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
try:
# === 注册流程 ===
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
page.get(TARGET_URL)
# 使用 data-testid 定位登录按钮
login_btn = page.ele('@data-testid=login-button', timeout=30)
if not login_btn:
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
login_btn.click()
log_progress("填入邮箱...")
email_input = page.ele('@name=email', timeout=30)
email_input.input(email)
page.ele('xpath://button[@type="submit"]').click()
log_progress("填入密码...")
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
password_input.input(password)
page.ele('xpath://button[@type="submit"]').click()
log_status("步骤 2/5", "等待邮件 (All Mail)...")
# === 接码与验证 ===
verify_data = get_verification_content(email)
if verify_data:
log_status("步骤 3/5", "执行验证...")
if verify_data['type'] == 'link':
page.new_tab(verify_data['val'])
time.sleep(5)
elif verify_data['type'] == 'code':
code = verify_data['val']
log_progress(f"填入验证码: {code}")
try:
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
code_input.input(code)
# 点击继续(使用 type=submit
time.sleep(1)
try:
log_progress("尝试点击继续按钮...")
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
except:
log_progress("未找到按钮或已自动跳转...")
except Exception as e:
log_progress(f"验证码填入异常: {e}")
# === 资料填写 (姓名+生日) ===
log_status("步骤 4/5", "进入信息填写页...")
try:
name_input = page.ele('@name=name', timeout=20)
name_input.input(real_name)
# 使用 Tab 键切换到生日字段
page.actions.key_down('Tab').key_up('Tab')
# 生成随机生日并输入
birth_year, birth_month, birth_day = generate_random_birthday()
birth_str = birth_year + birth_month + birth_day
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day}")
for digit in birth_str:
page.actions.type(digit)
time.sleep(0.1)
time.sleep(1.5)
# 点击继续/完成注册(使用 type=submit
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
final_reg_btn.click()
# =======================================================
# 【关键节点】等待进入主页后再执行 JS 跳转到支付页
# =======================================================
log_status("订阅", "等待进入 ChatGPT 主页...")
# 等待 URL 变成 chatgpt.com不含 auth 路径)
for _ in range(30):
current_url = page.url
if 'chatgpt.com' in current_url and 'auth' not in current_url and 'login' not in current_url:
log_progress(f"✓ 已进入主页: {current_url[:50]}...")
break
time.sleep(1)
else:
log_progress("⚠ 等待主页超时,尝试继续...")
# 额外等待页面稳定
time.sleep(3)
log_status("订阅", "执行 JS 跳转到支付页...")
# 直接执行 JS 跳转到支付页
checkout_js = '''
(async function(){
try {
const t = await(await fetch("/api/auth/session")).json();
if(!t.accessToken){
return "请先登录ChatGPT";
}
const p = {
plan_name: "chatgptteamplan",
team_plan_data: {
workspace_name: "Sepa",
price_interval: "month",
seat_quantity: 5
},
billing_details: {
country: "DE",
currency: "EUR"
},
promo_campaign: {
promo_campaign_id: "team-1-month-free",
is_coupon_from_query_param: true
},
checkout_ui_mode: "redirect"
};
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
method: "POST",
headers: {
Authorization: "Bearer " + t.accessToken,
"Content-Type": "application/json"
},
body: JSON.stringify(p)
});
const d = await r.json();
if(d.url){
window.location.href = d.url;
return "success";
} else {
return "提取失败:" + (d.detail || JSON.stringify(d));
}
} catch(e) {
return "发生错误:" + e;
}
})();
'''
result = page.run_js(checkout_js)
log_progress(f"JS 执行结果: {result}")
# 等待跳转到支付页(使用 URL 检测代替固定等待)
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("✓ 已跳转到支付页")
except:
time.sleep(2) # 兜底等待
# 执行支付流程
result = run_payment_flow(page, email)
# 保存账号信息
if result and result.get("token"):
save_account(
email,
password,
result["token"],
result.get("account_id", "")
)
except Exception as e:
log_status("崩溃", f"注册/订阅转换阶段异常: {e}")
else:
log_status("失败", "未能找到验证码。")
except Exception as e:
log_status("崩溃", f"全局错误: {e}")
print("\n" + "="*60)
input("按回车键退出...")
page.quit()
def run_single_registration(progress_callback=None, step_callback=None) -> dict:
"""执行单次注册流程 (供 Bot 调用)
Args:
progress_callback: 进度回调函数 (message: str) - 用于日志
step_callback: 步骤回调函数 (step: str) - 用于更新 Bot 显示的当前步骤
Returns:
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
"""
def log_cb(msg):
if progress_callback:
progress_callback(msg)
log_progress(msg)
def step_cb(step):
if step_callback:
step_callback(step)
# 检查必要配置
if not MAIL_API_TOKEN or not MAIL_API_BASE:
return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"}
# 检查域名
domains = get_email_domains()
if not domains:
return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"}
# 检查 IBAN
ibans = get_sepa_ibans()
if not ibans:
return {"success": False, "error": "没有可用的 IBAN请先通过 /iban_add 导入"}
step_cb("生成账号信息...")
# 生成账号信息
random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15))
email_domain = random.choice(domains)
email = f"{random_str}{email_domain}"
password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \
''.join(random.choices(string.ascii_lowercase, k=8)) + \
''.join(random.choices(string.digits, k=2)) + \
random.choice('!@#$%')
real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
# 检测操作系统
is_linux = platform.system() == "Linux"
step_cb("启动浏览器...")
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log_status("指纹", f"已注入: {fingerprint['webgl_renderer'][:40]}...")
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
# 配置浏览器
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--disable-infobars')
co.set_argument('--incognito')
co.set_argument('--disable-gpu')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--no-sandbox')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--lang=zh-CN')
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
co.set_argument('--single-process')
co.set_argument('--remote-debugging-port=0')
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
break
else:
co.auto_port(True)
co.set_local_port(random.randint(19222, 29999))
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
page = None
try:
page = ChromiumPage(co)
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
# 注册流程
step_cb("打开注册页面...")
log_status("步骤 1/5", "打开 ChatGPT 注册页...")
page.get(TARGET_URL)
step_cb("点击登录按钮...")
login_btn = page.ele('@data-testid=login-button', timeout=30)
if not login_btn:
login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10)
login_btn.click()
step_cb("填写邮箱...")
log_progress("填入邮箱...")
email_input = page.ele('@name=email', timeout=30)
email_input.input(email)
page.ele('xpath://button[@type="submit"]').click()
step_cb("填写密码...")
log_progress("填入密码...")
password_input = page.ele('xpath://input[@type="password"]', timeout=30)
password_input.input(password)
page.ele('xpath://button[@type="submit"]').click()
step_cb("等待验证邮件...")
log_status("步骤 2/5", "等待邮件 (All Mail)...")
verify_data = get_verification_content(email)
if not verify_data:
return {"success": False, "error": "未能获取验证码", "account": email, "password": password}
step_cb("执行邮箱验证...")
log_status("步骤 3/5", "执行验证...")
if verify_data['type'] == 'link':
page.new_tab(verify_data['val'])
time.sleep(5)
elif verify_data['type'] == 'code':
code = verify_data['val']
log_progress(f"填入验证码: {code}")
code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15)
code_input.input(code)
time.sleep(1)
try:
log_progress("尝试点击继续按钮...")
continue_btn = page.ele('css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
except:
log_progress("未找到按钮或已自动跳转...")
# 资料填写
step_cb("填写个人信息...")
log_status("步骤 4/5", "进入信息填写页...")
name_input = page.ele('@name=name', timeout=20)
name_input.input(real_name)
page.actions.key_down('Tab').key_up('Tab')
birth_year, birth_month, birth_day = generate_random_birthday()
birth_str = birth_year + birth_month + birth_day
log_progress(f"生日: {birth_year}-{birth_month}-{birth_day}")
for digit in birth_str:
page.actions.type(digit)
time.sleep(0.1)
time.sleep(1.5)
final_reg_btn = page.ele('css:button[type="submit"]', timeout=20)
final_reg_btn.click()
# 等待进入主页
step_cb("等待进入主页...")
log_status("订阅", "等待进入 ChatGPT 主页...")
for _ in range(30):
current_url = page.url
if 'chatgpt.com' in current_url and 'auth' not in current_url and 'login' not in current_url:
log_progress(f"✓ 已进入主页: {current_url[:50]}...")
break
time.sleep(1)
else:
log_progress("⚠ 等待主页超时,尝试继续...")
time.sleep(3)
# 跳转到支付页
step_cb("跳转到支付页...")
log_status("订阅", "执行 JS 跳转到支付页...")
checkout_js = '''
(async function(){
try {
const t = await(await fetch("/api/auth/session")).json();
if(!t.accessToken){ return "请先登录ChatGPT"; }
const p = {
plan_name: "chatgptteamplan",
team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 },
billing_details: { country: "DE", currency: "EUR" },
promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true },
checkout_ui_mode: "redirect"
};
const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", {
method: "POST",
headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" },
body: JSON.stringify(p)
});
const d = await r.json();
if(d.url){ window.location.href = d.url; return "success"; }
else { return "提取失败:" + (d.detail || JSON.stringify(d)); }
} catch(e) { return "发生错误:" + e; }
})();
'''
result = page.run_js(checkout_js)
log_progress(f"JS 执行结果: {result}")
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("✓ 已跳转到支付页")
except:
time.sleep(2)
# 执行支付流程
step_cb("执行 SEPA 支付...")
result = run_payment_flow(page, email, step_cb)
if result and result.get("token"):
step_cb("注册成功!")
log_status("完成", "✓ 注册成功!")
return {
"success": True,
"account": email,
"password": password,
"token": result["token"],
"account_id": result.get("account_id", "")
}
else:
log_status("失败", "注册失败: 支付流程失败")
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
except Exception as e:
log_status("错误", f"注册异常: {e}")
return {"success": False, "error": str(e), "account": email, "password": password}
finally:
if page:
try:
page.quit()
except:
pass
cleanup_chrome_processes()
if __name__ == "__main__":
run_main_process()