""" Author: muyyg Project: Subscription Automation (DrissionPage Version) Created: 2026-01-12 Version: 3.0-drission """ import time import random import string import re import sys import os import platform import subprocess import requests from pathlib import Path from DrissionPage import ChromiumPage, ChromiumOptions # ================= 配置加载 ================= try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: tomllib = None BASE_DIR = Path(__file__).parent CONFIG_FILE = BASE_DIR / "config.toml" def _load_config(): """从 config.toml 加载配置""" if tomllib is None or not CONFIG_FILE.exists(): return {} try: with open(CONFIG_FILE, "rb") as f: return tomllib.load(f) except Exception: return {} _cfg = _load_config() _autogptplus = _cfg.get("autogptplus", {}) # ================= 核心配置区域 ================= # 从 config.toml [autogptplus] 读取,如未配置则使用默认值 # 1. 管理员 Token MAIL_API_TOKEN = _autogptplus.get("mail_api_token", "") # 2. 你的域名后缀(随机选择) EMAIL_DOMAINS = _autogptplus.get("email_domains", []) # 3. Cloud-Mail 部署地址 MAIL_API_BASE = _autogptplus.get("mail_api_base", "") # 4. 接口路径 (邮件查询) MAIL_API_PATH = "/api/public/emailList" # 5. SEPA IBAN 列表 (从配置文件读取) SEPA_IBANS = _autogptplus.get("sepa_ibans", []) # 6. 随机指纹开关 RANDOM_FINGERPRINT = _autogptplus.get("random_fingerprint", True) # ================= 浏览器指纹 ================= FINGERPRINTS = [ # NVIDIA 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4080 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 3840, "height": 2160} }, # AMD 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6800 XT Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 7900 XTX Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 3840, "height": 2160} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6700 XT Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, # Intel 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) Iris Xe Graphics Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) Arc A770 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, # 笔记本配置 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3050 Laptop GPU Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 Laptop GPU Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1600} }, ] def get_random_fingerprint() -> dict: """随机获取一个浏览器指纹""" return random.choice(FINGERPRINTS) def inject_fingerprint(page, fingerprint: dict): """注入浏览器指纹伪装脚本""" try: webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)") webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)") plat = fingerprint.get("platform", "Win32") screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) js_script = f''' // 伪装 WebGL 指纹 const getParameterProxyHandler = {{ apply: function(target, thisArg, args) {{ const param = args[0]; if (param === 37445) {{ return "{webgl_vendor}"; }} if (param === 37446) {{ return "{webgl_renderer}"; }} return Reflect.apply(target, thisArg, args); }} }}; const originalGetParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler); if (typeof WebGL2RenderingContext !== 'undefined') {{ const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter; WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler); }} // 伪装 platform Object.defineProperty(navigator, 'platform', {{ get: () => "{plat}" }}); // 伪装屏幕分辨率 Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]} }}); Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]} }}); Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]} }}); Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]} }}); // 隐藏 webdriver 特征 Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined }}); // 伪装 languages Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"] }}); // 伪装 plugins Object.defineProperty(navigator, 'plugins', {{ get: () => [ {{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }}, {{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }}, {{ name: "Native Client", filename: "internal-nacl-plugin" }} ] }}); ''' page.run_js(js_script) log_status("指纹", f"已注入: {webgl_renderer[:40]}...") except Exception as e: log_status("指纹", f"注入失败: {e}") # ================= IBAN 管理函数 ================= IBAN_FILE = BASE_DIR / "sepa_ibans.txt" def load_ibans_from_file(): """从文件加载 IBAN 列表""" if not IBAN_FILE.exists(): return [] try: with open(IBAN_FILE, "r", encoding="utf-8") as f: ibans = [line.strip() for line in f if line.strip() and line.strip().startswith("DE")] return ibans except Exception: return [] def save_ibans_to_file(ibans: list): """保存 IBAN 列表到文件""" try: with open(IBAN_FILE, "w", encoding="utf-8") as f: f.write("\n".join(ibans)) return True except Exception: return False def get_sepa_ibans(): """获取 SEPA IBAN 列表 (优先从文件读取)""" file_ibans = load_ibans_from_file() if file_ibans: return file_ibans return SEPA_IBANS def add_sepa_ibans(new_ibans: list) -> tuple: """添加 IBAN 到列表 Args: new_ibans: 新的 IBAN 列表 Returns: tuple: (添加数量, 跳过数量, 当前总数) """ current = set(load_ibans_from_file()) added = 0 skipped = 0 for iban in new_ibans: iban = iban.strip().upper() if not iban or not iban.startswith("DE"): continue if iban in current: skipped += 1 else: current.add(iban) added += 1 save_ibans_to_file(sorted(current)) return added, skipped, len(current) def clear_sepa_ibans(): """清空 IBAN 列表""" if IBAN_FILE.exists(): IBAN_FILE.unlink() return True # ================= 域名管理函数 ================= DOMAIN_FILE = BASE_DIR / "email_domains.txt" def load_domains_from_file(): """从文件加载域名列表""" if not DOMAIN_FILE.exists(): return [] try: with open(DOMAIN_FILE, "r", encoding="utf-8") as f: domains = [line.strip() for line in f if line.strip() and line.strip().startswith("@")] return domains except Exception: return [] def save_domains_to_file(domains: list): """保存域名列表到文件""" try: with open(DOMAIN_FILE, "w", encoding="utf-8") as f: f.write("\n".join(domains)) return True except Exception: return False def get_email_domains(): """获取邮箱域名列表 (合并文件和配置)""" file_domains = set(load_domains_from_file()) config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set() # 合并两个来源的域名 all_domains = file_domains | config_domains return sorted(all_domains) if all_domains else [] def add_email_domains(new_domains: list) -> tuple: """添加域名到列表 Args: new_domains: 新的域名列表 Returns: tuple: (添加数量, 跳过数量, 当前总数) """ # 获取当前所有域名(文件 + 配置) current = set(load_domains_from_file()) config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set() all_existing = current | config_domains added = 0 skipped = 0 for domain in new_domains: domain = domain.strip().lower() # 确保以 @ 开头 if not domain.startswith("@"): domain = "@" + domain if not domain or len(domain) < 4: # 至少 @x.y continue if domain in all_existing: skipped += 1 else: current.add(domain) all_existing.add(domain) added += 1 # 只保存通过 Bot 添加的域名到文件 save_domains_to_file(sorted(current)) return added, skipped, len(all_existing) def remove_email_domain(domain: str) -> bool: """删除指定域名 (只能删除通过 Bot 添加的域名) Args: domain: 要删除的域名 Returns: bool: 是否删除成功 """ current = set(load_domains_from_file()) domain = domain.strip().lower() if not domain.startswith("@"): domain = "@" + domain if domain in current: current.remove(domain) save_domains_to_file(sorted(current)) return True return False def clear_email_domains(): """清空域名列表""" if DOMAIN_FILE.exists(): DOMAIN_FILE.unlink() return True # ================= 固定配置 ================= TARGET_URL = "https://chatgpt.com" def generate_random_birthday(): """生成随机生日 (2000-2004年)""" year = random.randint(2000, 2004) month = random.randint(1, 12) # 根据月份确定天数 if month in [1, 3, 5, 7, 8, 10, 12]: max_day = 31 elif month in [4, 6, 9, 11]: max_day = 30 else: # 2月 max_day = 29 if year % 4 == 0 else 28 day = random.randint(1, max_day) return str(year), f"{month:02d}", f"{day:02d}" # 地址格式: (街道, 邮编, 城市) SEPA_ADDRESSES = [ # 柏林 ("Alexanderplatz 1", "10178", "Berlin"), ("Unter den Linden 77", "10117", "Berlin"), ("Kurfürstendamm 21", "10719", "Berlin"), ("Friedrichstraße 43", "10117", "Berlin"), ("Potsdamer Platz 1", "10785", "Berlin"), ("Tauentzienstraße 9", "10789", "Berlin"), # 慕尼黑 ("Marienplatz 8", "80331", "München"), ("Leopoldstraße 32", "80802", "München"), ("Maximilianstraße 17", "80539", "München"), ("Kaufingerstraße 28", "80331", "München"), ("Sendlinger Straße 3", "80331", "München"), # 汉堡 ("Mönckebergstraße 16", "20095", "Hamburg"), ("Jungfernstieg 38", "20354", "Hamburg"), ("Spitalerstraße 12", "20095", "Hamburg"), ("Neuer Wall 50", "20354", "Hamburg"), # 法兰克福 ("Zeil 106", "60313", "Frankfurt am Main"), ("Kaiserstraße 62", "60329", "Frankfurt am Main"), ("Goethestraße 1", "60313", "Frankfurt am Main"), ("Große Bockenheimer Str. 2", "60313", "Frankfurt am Main"), # 科隆 ("Hohe Straße 111", "50667", "Köln"), ("Schildergasse 24", "50667", "Köln"), ("Breite Straße 80", "50667", "Köln"), # 斯图加特 ("Königstraße 2", "70173", "Stuttgart"), ("Calwer Straße 19", "70173", "Stuttgart"), ("Schulstraße 5", "70173", "Stuttgart"), # 杜塞尔多夫 ("Königsallee 60", "40212", "Düsseldorf"), ("Schadowstraße 11", "40212", "Düsseldorf"), ("Flinger Straße 36", "40213", "Düsseldorf"), # 莱比锡 ("Grimmaische Straße 25", "04109", "Leipzig"), ("Petersstraße 36", "04109", "Leipzig"), # 德累斯顿 ("Prager Straße 12", "01069", "Dresden"), ("Altmarkt 10", "01067", "Dresden"), # 纽伦堡 ("Karolinenstraße 12", "90402", "Nürnberg"), ("Breite Gasse 23", "90402", "Nürnberg"), # 汉诺威 ("Georgstraße 10", "30159", "Hannover"), ("Bahnhofstraße 5", "30159", "Hannover"), # 不来梅 ("Obernstraße 2", "28195", "Bremen"), ("Sögestraße 18", "28195", "Bremen"), ] FIRST_NAMES = [ # 德国常见男性名 "Lukas", "Leon", "Maximilian", "Felix", "Paul", "Jonas", "Tim", "David", "Niklas", "Jan", "Philipp", "Moritz", "Alexander", "Sebastian", "Florian", "Julian", "Tobias", "Simon", "Daniel", "Christian", "Markus", "Thomas", "Michael", "Stefan", "Andreas", "Martin", "Matthias", "Benjamin", "Patrick", # 德国常见女性名 "Anna", "Laura", "Julia", "Lena", "Sarah", "Lisa", "Marie", "Sophie", "Katharina", "Hannah", "Emma", "Mia", "Lea", "Johanna", "Clara", "Charlotte", "Emilia", "Luisa", "Nina", "Elena", "Melanie", "Christina", "Sandra", "Nicole", "Sabine", "Claudia", "Petra", "Monika", "Stefanie", ] LAST_NAMES = [ # 德国最常见姓氏 "Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann", "Schäfer", "Koch", "Bauer", "Richter", "Klein", "Wolf", "Schröder", "Neumann", "Schwarz", "Zimmermann", "Braun", "Krüger", "Hofmann", "Hartmann", "Lange", "Schmitt", "Werner", "Schmitz", "Krause", "Meier", "Lehmann", "Schmid", "Schulze", "Maier", "Köhler", "Herrmann", "König", "Walter", "Mayer", "Huber", "Kaiser", "Fuchs", "Peters", "Lang", "Scholz", "Möller", "Weiß", "Jung", "Hahn", "Schubert", ] # ================= 工具函数 ================= def cleanup_chrome_processes(): """清理残留的 Chrome 进程 (跨平台支持)""" try: if platform.system() == "Windows": # Windows: 使用 taskkill 清理 chromedriver 和 chrome try: subprocess.run( ['taskkill', '/F', '/IM', 'chromedriver.exe'], capture_output=True, timeout=5 ) except Exception: pass # 清理无头模式的 chrome 进程 (带 --headless 参数的) try: result = subprocess.run( ['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'], capture_output=True, text=True, timeout=5 ) for line in result.stdout.strip().split('\n'): pid = line.strip() if pid.isdigit(): subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5) except Exception: pass log_status("清理", "已清理 Chrome 残留进程") else: # Linux/Mac: 使用 pkill try: subprocess.run( ['pkill', '-f', 'chromedriver'], capture_output=True, timeout=5 ) except Exception: pass # 清理无头模式的 chrome 进程 try: subprocess.run( ['pkill', '-f', 'chrome.*--headless'], capture_output=True, timeout=5 ) except Exception: pass log_status("清理", "已清理 Chrome 残留进程") except Exception: pass # 静默处理,不影响主流程 def log_status(step, message): timestamp = time.strftime("%H:%M:%S") print(f"[{timestamp}] [{step}] {message}") sys.stdout.flush() def log_progress(message): print(f" -> {message}") sys.stdout.flush() def save_account(email, password, token, account_id=""): """保存账号信息到 JSON 文件""" import json accounts_file = "accounts.json" # 读取现有账号 accounts = [] try: with open(accounts_file, 'r', encoding='utf-8') as f: accounts = json.load(f) except: pass # 添加新账号 account_data = { "account": email, "password": password, "token": token } if account_id: account_data["account_id"] = account_id accounts.append(account_data) # 保存 with open(accounts_file, 'w', encoding='utf-8') as f: json.dump(accounts, f, ensure_ascii=False, indent=2) log_status("保存", f"账号已保存到 {accounts_file}") def get_verification_content(target_email, max_retries=90): log_status("API监听", f"正在监听邮件 ({MAIL_API_PATH})...") headers = { "Authorization": MAIL_API_TOKEN, "Content-Type": "application/json" } start_time = time.time() for i in range(max_retries): elapsed = int(time.time() - start_time) try: url = f"{MAIL_API_BASE}{MAIL_API_PATH}" payload = { "toEmail": target_email, "timeSort": "desc", "size": 20 } resp = requests.post(url, headers=headers, json=payload, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: mails = data.get('data', []) if mails: for mail in mails: log_status("捕获", "✅ 成功抓取到目标邮件!") html_body = mail.get('content') or mail.get('text') or str(mail) # 提取验证码 code_match = re.search(r'\b(\d{6})\b', html_body) if code_match: code = code_match.group(1) log_status("解析", f"提取到验证码: {code}") return {"type": "code", "val": code} # 提取链接 link_match = re.search(r'href="(https://.*openai\.com/.*verification.*)"', html_body) if not link_match: link_match = re.search(r'href="(https://auth0\.openai\.com/u/login/identifier\?state=[^"]+)"', html_body) if link_match: link = link_match.group(1) log_status("解析", f"提取到链接: {link[:30]}...") return {"type": "link", "val": link} except: pass if i % 5 == 0: print(f" [监听中] 已耗时 {elapsed}秒...") sys.stdout.flush() time.sleep(2) log_status("超时", "❌ 未能获取验证码。") return None def run_payment_flow(page, email, step_callback=None): """执行 SEPA 支付流程 - 严格按顺序执行,任一步骤失败则终止 Args: page: 浏览器页面对象 email: 邮箱地址 step_callback: 步骤回调函数 (step: str) """ def step_cb(step): if step_callback: step_callback(step) log_status("支付流程", "开始处理 Stripe 支付页...") try: # 等待支付页加载完成(等待邮箱输入框出现) step_cb("等待支付页加载...") log_status("支付页", "等待支付页加载...") try: page.ele('#email', timeout=10) log_progress("✓ 支付页已加载") except: time.sleep(2) # 兜底等待 # 随机选择 IBAN 和地址 ibans = get_sepa_ibans() if not ibans: log_progress("❌ 没有可用的 IBAN,请先通过 Bot 导入") return None sepa_iban = random.choice(ibans) street, postal_code, city = random.choice(SEPA_ADDRESSES) account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" log_progress(f"使用 IBAN: {sepa_iban[:8]}...") log_progress(f"使用地址: {street}, {postal_code} {city}") log_progress(f"账户名: {account_name}") # ========== 步骤 1: 填写邮箱 ========== step_cb("填写支付邮箱...") log_progress("[步骤1] 填写邮箱...") try: email_input = page.ele('#email', timeout=10) if not email_input: log_progress("❌ 邮箱输入框未找到") return None email_input.clear() email_input.input(email) log_progress(f"✓ 已填写邮箱: {email}") time.sleep(1) except Exception as e: log_progress(f"❌ 邮箱填写失败: {e}") return None # ========== 步骤 2: 选择 SEPA ========== step_cb("选择 SEPA 支付方式...") log_progress("[步骤2] 选择 SEPA 直接借记...") time.sleep(2) sepa_clicked = False # 定位方式(按速度排序:属性选择器 > CSS > xpath) sepa_selectors = [ '@data-testid=sepa_debit-accordion-item-button', # 最快:属性选择器 'css:button[data-testid*="sepa"]', # 快:CSS 模糊匹配 'xpath://button[contains(., "SEPA")]', # 备用:xpath 文本匹配 ] for selector in sepa_selectors: try: sepa_btn = page.ele(selector, timeout=2) if sepa_btn: page.run_js('arguments[0].click()', sepa_btn) sepa_clicked = True log_progress("✓ 已点击 SEPA 按钮") time.sleep(2) break except: continue if not sepa_clicked: # 最后尝试 JS try: result = page.run_js(''' const btns = document.querySelectorAll('button'); for(let btn of btns) { if(btn.innerText.includes('SEPA')) { btn.click(); return true; } } return false; ''') if result: sepa_clicked = True log_progress("✓ 已点击 SEPA (JS)") time.sleep(2) except: pass if not sepa_clicked: log_progress("❌ SEPA 选择失败") return None # 验证 SEPA 是否真正展开(检查 IBAN 输入框是否出现) log_progress("验证 SEPA 是否展开...") time.sleep(2) try: iban_check = page.ele('#iban', timeout=5) if not iban_check: log_progress("❌ SEPA 未展开,IBAN 输入框未出现") return None log_progress("✓ SEPA 已展开,IBAN 输入框已出现") except: log_progress("❌ SEPA 未展开,IBAN 输入框未出现") return None # ========== 步骤 3: 填写 IBAN ========== step_cb("填写 IBAN...") log_progress("[步骤3] 填写 IBAN...") try: # 优先使用 #iban (Stripe 标准 id),更快 iban_input = page.ele('#iban', timeout=5) if not iban_input: iban_input = page.ele('@name=iban', timeout=3) if not iban_input: log_progress("❌ IBAN 输入框未找到") return None iban_input.input(sepa_iban) log_progress(f"✓ 已填写 IBAN: {sepa_iban}") time.sleep(1) except Exception as e: log_progress(f"❌ IBAN 填写失败: {e}") return None # ========== 步骤 4: 填写账户姓名 ========== step_cb("填写账户姓名...") log_progress("[步骤4] 填写账户姓名...") try: # 优先使用 billingName (Stripe 支付页面标准 id) name_input = page.ele('#billingName', timeout=5) if not name_input: name_input = page.ele('@name=billingName', timeout=3) if not name_input: name_input = page.ele('xpath://input[@name="name" or contains(@id, "name") or contains(@placeholder, "姓名")]', timeout=3) if not name_input: log_progress("❌ 姓名输入框未找到") return None name_input.input(account_name) log_progress(f"✓ 已填写账户姓名: {account_name}") time.sleep(1) except Exception as e: log_progress(f"❌ 账户姓名填写失败: {e}") return None # ========== 步骤 5: 填写地址 ========== step_cb("填写账单地址...") log_progress("[步骤5] 填写地址...") try: # 检查是否需要点击"手动输入地址"(仅当地址输入框不存在时) addr_input = page.ele('#billingAddressLine1', timeout=1) if not addr_input: # 尝试点击手动输入地址按钮 try: manual_btn = page.ele('@data-testid=manual-address-entry', timeout=1) if manual_btn: manual_btn.click() time.sleep(0.5) except: pass addr_input = page.ele('#billingAddressLine1', timeout=3) if not addr_input: log_progress("❌ 地址输入框未找到") return None # 一次性填写所有地址字段 addr_input.input(street) log_progress(f"✓ 已填写地址: {street}") postal_input = page.ele('#billingPostalCode', timeout=1) if postal_input: postal_input.input(postal_code) log_progress(f"✓ 已填写邮编: {postal_code}") city_input = page.ele('#billingLocality', timeout=1) if city_input: city_input.input(city) log_progress(f"✓ 已填写城市: {city}") # 关闭 Google 地址建议弹窗 page.actions.key_down('Escape').key_up('Escape') time.sleep(0.3) page.run_js('document.body.click()') except Exception as e: log_progress(f"❌ 地址填写失败: {e}") return None # ========== 步骤 6: 勾选条款 ========== step_cb("勾选服务条款...") log_progress("[步骤6] 勾选条款...") try: terms_checkbox = page.ele('#termsOfServiceConsentCheckbox', timeout=5) if terms_checkbox: terms_checkbox.click() log_progress("✓ 已勾选条款") time.sleep(1) except Exception as e: log_progress(f"⚠ 条款勾选失败(可能已勾选): {e}") # ========== 步骤 7: 点击订阅 ========== step_cb("提交订阅...") log_progress("[步骤7] 点击订阅按钮...") time.sleep(2) subscribe_processing = False # 尝试点击订阅按钮并验证是否进入处理状态 for attempt in range(3): try: subscribe_btn = page.ele('css:button[type="submit"]', timeout=5) if subscribe_btn: subscribe_btn.click() log_progress(f"[尝试{attempt+1}] 已点击订阅按钮,等待处理状态...") # 等待按钮变成"正在处理"状态(检测 disabled 属性或 spinner) for _ in range(10): time.sleep(0.5) try: # 检查按钮是否被禁用(处理中) btn_disabled = page.run_js(''' const btn = document.querySelector('button[type="submit"]'); if (!btn) return false; return btn.disabled || btn.classList.contains('processing') || btn.querySelector('.spinner, .loading, svg') !== null; ''') if btn_disabled: subscribe_processing = True log_progress("✓ 订阅按钮已进入处理状态") break # 检查 URL 是否已经变化(支付成功) if 'success' in page.url: subscribe_processing = True log_progress("✓ 已检测到支付成功") break except: pass if subscribe_processing: break except: pass if not subscribe_processing: # JS 备用点击 try: page.run_js('document.querySelector("button[type=submit]").click()') time.sleep(2) except: pass if not subscribe_processing: log_progress("⚠ 未检测到处理状态,继续等待支付结果...") # ========== 步骤 8: 等待支付成功 ========== step_cb("等待支付处理...") log_status("等待", "等待支付处理(超时60秒)...") try: page.wait.url_change('payments/success-team', timeout=60) log_status("成功", "✓ 支付成功!") except: log_status("超时", "❌ 支付未在60秒内完成") return None # ========== 步骤 9: 获取 token 和 account_id ========== step_cb("获取 Token...") log_status("获取", "正在获取 access token...") time.sleep(2) page.get("https://chatgpt.com/api/auth/session") time.sleep(2) try: # 获取页面内容(JSON) session_text = page.ele('tag:pre', timeout=5).text import json session_data = json.loads(session_text) access_token = session_data.get('accessToken', '') if access_token: log_status("成功", f"获取到 token: {access_token[:50]}...") # 获取 account_id step_cb("获取 Account ID...") account_id = fetch_account_id(page, access_token) return { "token": access_token, "account_id": account_id } else: log_progress("未找到 accessToken") return None except Exception as e: log_progress(f"获取 token 失败: {e}") return None except Exception as e: log_status("错误", f"[X] 支付流程异常: {e}") return None def fetch_account_id(page, access_token: str) -> str: """通过 API 获取 account_id""" log_status("获取", "正在获取 account_id...") try: page.get("https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27") time.sleep(2) # 使用 JS 请求 API result = page.run_js(f''' return fetch("https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27", {{ headers: {{ "Authorization": "Bearer {access_token}", "Content-Type": "application/json" }} }}) .then(r => r.json()) .then(data => JSON.stringify(data)) .catch(e => "error:" + e); ''') if result and not result.startswith("error:"): import json data = json.loads(result) accounts = data.get("accounts", {}) # 优先查找 Team 账户 for acc_id, acc_info in accounts.items(): if acc_id == "default": continue account_data = acc_info.get("account", {}) plan_type = account_data.get("plan_type", "") if "team" in plan_type.lower(): log_status("成功", f"获取到 account_id: {acc_id[:8]}...") return acc_id # 取第一个非 default 的 for acc_id in accounts.keys(): if acc_id != "default": log_status("成功", f"获取到 account_id: {acc_id[:8]}...") return acc_id except Exception as e: log_progress(f"获取 account_id 失败: {e}") return "" def run_main_process(): # 检查必要配置 if not MAIL_API_TOKEN or not MAIL_API_BASE or not EMAIL_DOMAINS: print("\n" + "="*60) print("❌ 配置错误: 请在 config.toml 中配置 [autogptplus] 段") print(" - mail_api_token: Cloud Mail API Token") print(" - mail_api_base: Cloud Mail API 地址") print(" - email_domains: 可用邮箱域名列表") print("="*60 + "\n") return # 清理可能残留的 Chrome 调试进程 cleanup_chrome_processes() # === 1. 生成 15 位随机账号 + 同密码 === random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) domains = get_email_domains() if not domains: print("\n" + "="*60) print("❌ 配置错误: 没有可用的邮箱域名") print(" 请在 config.toml 中配置 email_domains 或通过 Bot 添加") print("="*60 + "\n") return email_domain = random.choice(domains) email = f"{random_str}{email_domain}" # 生成符合要求的密码:大小写字母+数字+特殊字符,至少12位 password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \ ''.join(random.choices(string.ascii_lowercase, k=8)) + \ ''.join(random.choices(string.digits, k=2)) + \ random.choice('!@#$%') real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" print("\n" + "="*60) log_status("初始化", f"生成账号: {email}") log_status("初始化", f"设置密码: {password}") print("="*60 + "\n") # 检测操作系统 is_linux = platform.system() == "Linux" # 获取随机指纹 fingerprint = None if RANDOM_FINGERPRINT: fingerprint = get_random_fingerprint() log_status("指纹", f"{fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}") else: fingerprint = { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} } log_status("指纹", "使用默认指纹 (RTX 3060)") # 配置 DrissionPage - 与项目 browser_automation.py 保持一致 co = ChromiumOptions() co.set_argument('--no-first-run') # 跳过首次运行 co.set_argument('--disable-infobars') co.set_argument('--incognito') # 无痕模式 co.set_argument('--disable-gpu') # 减少资源占用 co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题 co.set_argument('--no-sandbox') # 服务器环境需要 co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征 co.set_argument('--lang=zh-CN') # 设置语言为中文简体 co.set_argument(f'--user-agent={fingerprint["user_agent"]}') # 设置 User-Agent # Linux 服务器特殊配置 if is_linux: co.set_argument('--disable-software-rasterizer') co.set_argument('--disable-extensions') co.set_argument('--disable-setuid-sandbox') co.set_argument('--single-process') # 某些 Linux 环境需要 co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口 # 尝试查找 Chrome/Chromium 路径 chrome_paths = [ '/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/chromium-browser', '/usr/bin/chromium', '/snap/bin/chromium', ] for chrome_path in chrome_paths: if os.path.exists(chrome_path): co.set_browser_path(chrome_path) log_status("浏览器", f"使用浏览器: {chrome_path}") break else: co.auto_port(True) # Windows 使用自动分配端口 co.set_local_port(random.randint(19222, 29999)) # 备用:手动设置随机端口 # 无头模式 (服务器运行) screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) co.set_argument('--headless=new') co.set_argument(f'--window-size={screen["width"]},{screen["height"]}') log_status("浏览器", f"正在启动浏览器 (无头模式, {'Linux' if is_linux else 'Windows'})...") try: page = ChromiumPage(co) # 注入指纹伪装 if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) except Exception as e: log_status("浏览器", f"首次启动失败: {e},尝试清理后重试...") # 清理残留进程后重试 cleanup_chrome_processes() time.sleep(2) # 重新配置 co2 = ChromiumOptions() co2.set_argument('--no-first-run') co2.set_argument('--disable-infobars') co2.set_argument('--incognito') co2.set_argument('--disable-gpu') co2.set_argument('--disable-dev-shm-usage') co2.set_argument('--no-sandbox') co2.set_argument('--disable-blink-features=AutomationControlled') co2.set_argument('--lang=zh-CN') co2.set_argument(f'--user-agent={fingerprint["user_agent"]}') co2.set_argument('--headless=new') co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}') if is_linux: co2.set_argument('--disable-software-rasterizer') co2.set_argument('--disable-extensions') co2.set_argument('--disable-setuid-sandbox') co2.set_argument('--single-process') co2.set_argument('--remote-debugging-port=0') for chrome_path in chrome_paths: if os.path.exists(chrome_path): co2.set_browser_path(chrome_path) break else: co2.set_local_port(random.randint(30000, 39999)) page = ChromiumPage(co2) # 注入指纹伪装 if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) try: # === 注册流程 === log_status("步骤 1/5", "打开 ChatGPT 注册页...") page.get(TARGET_URL) # 使用 data-testid 定位登录按钮 login_btn = page.ele('@data-testid=login-button', timeout=30) if not login_btn: login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10) login_btn.click() log_progress("填入邮箱...") email_input = page.ele('@name=email', timeout=30) email_input.input(email) page.ele('xpath://button[@type="submit"]').click() log_progress("填入密码...") password_input = page.ele('xpath://input[@type="password"]', timeout=30) password_input.input(password) page.ele('xpath://button[@type="submit"]').click() log_status("步骤 2/5", "等待邮件 (All Mail)...") # === 接码与验证 === verify_data = get_verification_content(email) if verify_data: log_status("步骤 3/5", "执行验证...") if verify_data['type'] == 'link': page.new_tab(verify_data['val']) time.sleep(5) elif verify_data['type'] == 'code': code = verify_data['val'] log_progress(f"填入验证码: {code}") try: code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15) code_input.input(code) # 点击继续(使用 type=submit) time.sleep(1) try: log_progress("尝试点击继续按钮...") continue_btn = page.ele('css:button[type="submit"]', timeout=5) if continue_btn: continue_btn.click() except: log_progress("未找到按钮或已自动跳转...") except Exception as e: log_progress(f"验证码填入异常: {e}") # === 资料填写 (姓名+生日) === log_status("步骤 4/5", "进入信息填写页...") try: name_input = page.ele('@name=name', timeout=20) name_input.input(real_name) # 使用 Tab 键切换到生日字段 page.actions.key_down('Tab').key_up('Tab') # 生成随机生日并输入 birth_year, birth_month, birth_day = generate_random_birthday() birth_str = birth_year + birth_month + birth_day log_progress(f"生日: {birth_year}-{birth_month}-{birth_day}") for digit in birth_str: page.actions.type(digit) time.sleep(0.1) time.sleep(1.5) # 点击继续/完成注册(使用 type=submit) final_reg_btn = page.ele('css:button[type="submit"]', timeout=20) final_reg_btn.click() # ======================================================= # 【关键节点】等待进入主页后再执行 JS 跳转到支付页 # ======================================================= log_status("订阅", "等待进入 ChatGPT 主页...") # 等待 URL 变成 chatgpt.com(不含 auth 路径) for _ in range(30): current_url = page.url if 'chatgpt.com' in current_url and 'auth' not in current_url and 'login' not in current_url: log_progress(f"✓ 已进入主页: {current_url[:50]}...") break time.sleep(1) else: log_progress("⚠ 等待主页超时,尝试继续...") # 额外等待页面稳定 time.sleep(3) log_status("订阅", "执行 JS 跳转到支付页...") # 直接执行 JS 跳转到支付页 checkout_js = ''' (async function(){ try { const t = await(await fetch("/api/auth/session")).json(); if(!t.accessToken){ return "请先登录ChatGPT!"; } const p = { plan_name: "chatgptteamplan", team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 }, billing_details: { country: "DE", currency: "EUR" }, promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true }, checkout_ui_mode: "redirect" }; const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", { method: "POST", headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" }, body: JSON.stringify(p) }); const d = await r.json(); if(d.url){ window.location.href = d.url; return "success"; } else { return "提取失败:" + (d.detail || JSON.stringify(d)); } } catch(e) { return "发生错误:" + e; } })(); ''' result = page.run_js(checkout_js) log_progress(f"JS 执行结果: {result}") # 等待跳转到支付页(使用 URL 检测代替固定等待) try: page.wait.url_change('pay.openai.com', timeout=15) log_progress("✓ 已跳转到支付页") except: time.sleep(2) # 兜底等待 # 执行支付流程 result = run_payment_flow(page, email) # 保存账号信息 if result and result.get("token"): save_account( email, password, result["token"], result.get("account_id", "") ) except Exception as e: log_status("崩溃", f"注册/订阅转换阶段异常: {e}") else: log_status("失败", "未能找到验证码。") except Exception as e: log_status("崩溃", f"全局错误: {e}") print("\n" + "="*60) input("按回车键退出...") page.quit() def run_single_registration(progress_callback=None, step_callback=None) -> dict: """执行单次注册流程 (供 Bot 调用) Args: progress_callback: 进度回调函数 (message: str) - 用于日志 step_callback: 步骤回调函数 (step: str) - 用于更新 Bot 显示的当前步骤 Returns: dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str} """ def log_cb(msg): if progress_callback: progress_callback(msg) print(msg) def step_cb(step): if step_callback: step_callback(step) # 检查必要配置 if not MAIL_API_TOKEN or not MAIL_API_BASE: return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"} # 检查域名 domains = get_email_domains() if not domains: return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"} # 检查 IBAN ibans = get_sepa_ibans() if not ibans: return {"success": False, "error": "没有可用的 IBAN,请先通过 /iban_add 导入"} step_cb("生成账号信息...") # 生成账号信息 random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) email_domain = random.choice(domains) email = f"{random_str}{email_domain}" password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \ ''.join(random.choices(string.ascii_lowercase, k=8)) + \ ''.join(random.choices(string.digits, k=2)) + \ random.choice('!@#$%') real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" log_cb(f"生成账号: {email}") # 检测操作系统 is_linux = platform.system() == "Linux" step_cb("启动浏览器...") # 获取随机指纹 fingerprint = None if RANDOM_FINGERPRINT: fingerprint = get_random_fingerprint() else: fingerprint = { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} } # 配置浏览器 co = ChromiumOptions() co.set_argument('--no-first-run') co.set_argument('--disable-infobars') co.set_argument('--incognito') co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--no-sandbox') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument('--lang=zh-CN') co.set_argument(f'--user-agent={fingerprint["user_agent"]}') if is_linux: co.set_argument('--disable-software-rasterizer') co.set_argument('--disable-extensions') co.set_argument('--disable-setuid-sandbox') co.set_argument('--single-process') co.set_argument('--remote-debugging-port=0') chrome_paths = [ '/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/chromium-browser', '/usr/bin/chromium', '/snap/bin/chromium', ] for chrome_path in chrome_paths: if os.path.exists(chrome_path): co.set_browser_path(chrome_path) break else: co.auto_port(True) co.set_local_port(random.randint(19222, 29999)) screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) co.set_argument('--headless=new') co.set_argument(f'--window-size={screen["width"]},{screen["height"]}') page = None try: page = ChromiumPage(co) if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) # 注册流程 step_cb("打开注册页面...") log_cb("打开 ChatGPT 注册页...") page.get(TARGET_URL) step_cb("点击登录按钮...") login_btn = page.ele('@data-testid=login-button', timeout=30) if not login_btn: login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10) login_btn.click() step_cb("填写邮箱...") log_cb("填入邮箱...") email_input = page.ele('@name=email', timeout=30) email_input.input(email) page.ele('xpath://button[@type="submit"]').click() step_cb("填写密码...") log_cb("填入密码...") password_input = page.ele('xpath://input[@type="password"]', timeout=30) password_input.input(password) page.ele('xpath://button[@type="submit"]').click() step_cb("等待验证邮件...") log_cb("等待验证邮件...") verify_data = get_verification_content(email) if not verify_data: return {"success": False, "error": "未能获取验证码", "account": email, "password": password} step_cb("执行邮箱验证...") log_cb("执行验证...") if verify_data['type'] == 'link': page.new_tab(verify_data['val']) time.sleep(5) elif verify_data['type'] == 'code': code = verify_data['val'] log_cb(f"填入验证码: {code}") code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15) code_input.input(code) time.sleep(1) try: continue_btn = page.ele('css:button[type="submit"]', timeout=5) if continue_btn: continue_btn.click() except: pass # 资料填写 step_cb("填写个人信息...") log_cb("填写个人信息...") name_input = page.ele('@name=name', timeout=20) name_input.input(real_name) page.actions.key_down('Tab').key_up('Tab') birth_year, birth_month, birth_day = generate_random_birthday() birth_str = birth_year + birth_month + birth_day for digit in birth_str: page.actions.type(digit) time.sleep(0.1) time.sleep(1.5) final_reg_btn = page.ele('css:button[type="submit"]', timeout=20) final_reg_btn.click() # 等待进入主页 step_cb("等待进入主页...") log_cb("等待进入主页...") for _ in range(30): current_url = page.url if 'chatgpt.com' in current_url and 'auth' not in current_url and 'login' not in current_url: break time.sleep(1) time.sleep(3) # 跳转到支付页 step_cb("跳转到支付页...") log_cb("跳转到支付页...") checkout_js = ''' (async function(){ try { const t = await(await fetch("/api/auth/session")).json(); if(!t.accessToken){ return "请先登录ChatGPT!"; } const p = { plan_name: "chatgptteamplan", team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 }, billing_details: { country: "DE", currency: "EUR" }, promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true }, checkout_ui_mode: "redirect" }; const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", { method: "POST", headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" }, body: JSON.stringify(p) }); const d = await r.json(); if(d.url){ window.location.href = d.url; return "success"; } else { return "提取失败:" + (d.detail || JSON.stringify(d)); } } catch(e) { return "发生错误:" + e; } })(); ''' page.run_js(checkout_js) try: page.wait.url_change('pay.openai.com', timeout=15) except: time.sleep(2) # 执行支付流程 step_cb("执行 SEPA 支付...") log_cb("执行支付流程...") result = run_payment_flow(page, email, step_cb) if result and result.get("token"): step_cb("注册成功!") return { "success": True, "account": email, "password": password, "token": result["token"], "account_id": result.get("account_id", "") } else: return {"success": False, "error": "支付流程失败", "account": email, "password": password} except Exception as e: return {"success": False, "error": str(e), "account": email, "password": password} finally: if page: try: page.quit() except: pass cleanup_chrome_processes() if __name__ == "__main__": run_main_process()