""" Author: muyyg Project: Subscription Automation (DrissionPage Version) Created: 2026-01-12 Version: 3.1-hybrid (支持协议模式) 模式说明: - browser: 浏览器自动化模式 (默认),全程使用 DrissionPage 浏览器自动化 - api: 协议模式,使用 API 快速完成注册,仅支付环节使用浏览器 """ import time import random import string import re import sys import os import platform import subprocess import tempfile import requests from pathlib import Path from DrissionPage import ChromiumPage, ChromiumOptions # 导入协议模式模块 try: from api_register import ( ChatGPTAPIRegister, api_register_flow, api_login_flow, is_api_mode_available, get_verification_code_api, ShutdownRequested, ) API_MODE_AVAILABLE = is_api_mode_available() except ImportError: API_MODE_AVAILABLE = False ChatGPTAPIRegister = None api_register_flow = None api_login_flow = None ShutdownRequested = Exception # 回退到基础异常类 # 导入 Stripe API 支付模块 try: from stripe_api import ( StripePaymentAPI, api_payment_with_retry, is_stripe_api_available, ) STRIPE_API_AVAILABLE = is_stripe_api_available() except ImportError: STRIPE_API_AVAILABLE = False StripePaymentAPI = None api_payment_with_retry = None # ================= 配置加载 ================= try: import tomllib except ImportError: try: import tomli as tomllib except ImportError: tomllib = None BASE_DIR = Path(__file__).parent CONFIG_FILE = BASE_DIR / "config.toml" def _load_config(): """从 config.toml 加载配置""" if tomllib is None or not CONFIG_FILE.exists(): return {} try: with open(CONFIG_FILE, "rb") as f: return tomllib.load(f) except Exception: return {} _cfg = _load_config() _autogptplus = _cfg.get("autogptplus", {}) # ================= 核心配置区域 ================= # 从 config.toml [autogptplus] 读取,如未配置则使用默认值 # 1. 管理员 Token MAIL_API_TOKEN = _autogptplus.get("mail_api_token", "") # 2. 你的域名后缀(随机选择) EMAIL_DOMAINS = _autogptplus.get("email_domains", []) # 3. Cloud-Mail 部署地址 MAIL_API_BASE = _autogptplus.get("mail_api_base", "") # 4. 接口路径 (邮件查询) MAIL_API_PATH = "/api/public/emailList" # 5. SEPA IBAN 列表 (从配置文件读取) SEPA_IBANS = _autogptplus.get("sepa_ibans", []) # 6. 随机指纹开关 RANDOM_FINGERPRINT = _autogptplus.get("random_fingerprint", True) # 7. 注册模式: "browser" (浏览器自动化) 或 "api" (协议模式) # 默认使用 API 模式(更快),如果 curl_cffi 不可用则自动回退到浏览器模式 REGISTER_MODE = _autogptplus.get("register_mode", "api") # 8. 协议模式代理 (仅协议模式使用) API_PROXY = _autogptplus.get("api_proxy", "") # ================= 浏览器指纹 ================= FINGERPRINTS = [ # NVIDIA 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3080 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4070 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/141.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4080 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 3840, "height": 2160} }, # AMD 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6800 XT Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 7900 XTX Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 3840, "height": 2160} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (AMD)", "webgl_renderer": "ANGLE (AMD, AMD Radeon RX 6700 XT Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, # Intel 显卡 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) UHD Graphics 630 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) Iris Xe Graphics Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (Intel)", "webgl_renderer": "ANGLE (Intel, Intel(R) Arc A770 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1440} }, # 笔记本配置 { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3050 Laptop GPU Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} }, { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/139.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 4060 Laptop GPU Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 2560, "height": 1600} }, ] def get_random_fingerprint() -> dict: """随机获取一个浏览器指纹""" return random.choice(FINGERPRINTS) def inject_fingerprint(page, fingerprint: dict): """注入浏览器指纹伪装脚本""" global _last_log_message, _last_log_time try: webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)") webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)") plat = fingerprint.get("platform", "Win32") screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) # 使用 try-catch 包裹每个属性定义,避免 Linux 上属性不可重定义的错误 js_script = f''' (function() {{ // 伪装 WebGL 指纹 try {{ const getParameterProxyHandler = {{ apply: function(target, thisArg, args) {{ const param = args[0]; if (param === 37445) return "{webgl_vendor}"; if (param === 37446) return "{webgl_renderer}"; return Reflect.apply(target, thisArg, args); }} }}; const originalGetParameter = WebGLRenderingContext.prototype.getParameter; WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler); if (typeof WebGL2RenderingContext !== 'undefined') {{ const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter; WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler); }} }} catch(e) {{}} // 伪装 platform try {{ Object.defineProperty(navigator, 'platform', {{ get: () => "{plat}", configurable: true }}); }} catch(e) {{}} // 伪装屏幕分辨率 try {{ Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]}, configurable: true }}); Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]}, configurable: true }}); Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]}, configurable: true }}); Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]}, configurable: true }}); }} catch(e) {{}} // 隐藏 webdriver 特征 try {{ Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined, configurable: true }}); }} catch(e) {{}} // 伪装 languages try {{ Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"], configurable: true }}); }} catch(e) {{}} // 伪装 plugins try {{ Object.defineProperty(navigator, 'plugins', {{ get: () => [ {{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }}, {{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }}, {{ name: "Native Client", filename: "internal-nacl-plugin" }} ], configurable: true }}); }} catch(e) {{}} }})(); ''' page.run_js(js_script) # 获取浏览器语言设置 try: browser_lang = page.run_js('return navigator.language || navigator.userLanguage || "unknown"') except: browser_lang = "unknown" # 避免重复日志:1秒内相同消息不重复输出 current_time = time.time() log_msg = f"已注入: {webgl_renderer} | {screen['width']}x{screen['height']} | 语言: {browser_lang}" if log_msg != _last_log_message or current_time - _last_log_time > 1: log_status("指纹", log_msg) _last_log_message = log_msg _last_log_time = current_time except Exception as e: log_status("指纹", f"注入失败: {e}") # ================= IBAN 管理函数 ================= IBAN_FILE = BASE_DIR / "sepa_ibans.txt" def load_ibans_from_file(): """从文件加载 IBAN 列表""" if not IBAN_FILE.exists(): return [] try: with open(IBAN_FILE, "r", encoding="utf-8") as f: ibans = [line.strip() for line in f if line.strip() and line.strip().startswith("DE")] return ibans except Exception: return [] def save_ibans_to_file(ibans: list): """保存 IBAN 列表到文件""" try: with open(IBAN_FILE, "w", encoding="utf-8") as f: f.write("\n".join(ibans)) return True except Exception: return False def get_sepa_ibans(): """获取 SEPA IBAN 列表 (优先从文件读取)""" file_ibans = load_ibans_from_file() if file_ibans: return file_ibans return SEPA_IBANS def add_sepa_ibans(new_ibans: list) -> tuple: """添加 IBAN 到列表 Args: new_ibans: 新的 IBAN 列表 Returns: tuple: (添加数量, 跳过数量, 当前总数) """ current = set(load_ibans_from_file()) added = 0 skipped = 0 for iban in new_ibans: iban = iban.strip().upper() if not iban or not iban.startswith("DE"): continue if iban in current: skipped += 1 else: current.add(iban) added += 1 save_ibans_to_file(sorted(current)) return added, skipped, len(current) def clear_sepa_ibans(): """清空 IBAN 列表""" if IBAN_FILE.exists(): IBAN_FILE.unlink() return True # ================= 域名管理函数 ================= DOMAIN_FILE = BASE_DIR / "email_domains.txt" def load_domains_from_file(): """从文件加载域名列表""" if not DOMAIN_FILE.exists(): return [] try: with open(DOMAIN_FILE, "r", encoding="utf-8") as f: domains = [line.strip() for line in f if line.strip() and line.strip().startswith("@")] return domains except Exception: return [] def save_domains_to_file(domains: list): """保存域名列表到文件""" try: with open(DOMAIN_FILE, "w", encoding="utf-8") as f: f.write("\n".join(domains)) return True except Exception: return False def get_email_domains(): """获取邮箱域名列表 (合并文件和配置)""" file_domains = set(load_domains_from_file()) config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set() # 合并两个来源的域名 all_domains = file_domains | config_domains return sorted(all_domains) if all_domains else [] def validate_domain_format(domain: str) -> tuple: """验证域名格式是否正确 Args: domain: 要验证的域名 (带或不带@前缀) Returns: tuple: (是否有效, 标准化的域名或错误信息) """ domain = domain.strip().lower() # 移除开头的引号和尾部特殊字符 domain = domain.strip('"\'') # 确保以 @ 开头 if not domain.startswith("@"): domain = "@" + domain # 移除尾部可能的引号或逗号 domain = domain.rstrip('",\'') # 基本长度检查 (至少 @x.y) if len(domain) < 4: return False, "域名太短" # 提取 @ 后面的部分进行验证 domain_part = domain[1:] # 去掉 @ # 检查是否包含至少一个点 if "." not in domain_part: return False, "域名缺少点号" # 检查点的位置 (不能在开头或结尾) if domain_part.startswith(".") or domain_part.endswith("."): return False, "点号位置不正确" # 检查不能有连续的点 if ".." in domain_part: return False, "不能有连续的点号" # 检查每个部分是否有效 parts = domain_part.split(".") for part in parts: if not part: return False, "域名部分为空" # 检查是否只包含有效字符 (字母、数字、连字符) if not all(c.isalnum() or c == "-" for c in part): return False, f"域名包含无效字符" # 不能以连字符开头或结尾 if part.startswith("-") or part.endswith("-"): return False, "域名部分不能以连字符开头或结尾" # 顶级域名至少2个字符 if len(parts[-1]) < 2: return False, "顶级域名太短" return True, domain def add_email_domains(new_domains: list) -> tuple: """添加域名到列表 Args: new_domains: 新的域名列表 Returns: tuple: (添加数量, 跳过数量, 无效数量, 当前总数) """ # 获取当前所有域名(文件 + 配置) current = set(load_domains_from_file()) config_domains = set(EMAIL_DOMAINS) if EMAIL_DOMAINS else set() all_existing = current | config_domains added = 0 skipped = 0 invalid = 0 for domain in new_domains: # 验证域名格式 is_valid, result = validate_domain_format(domain) if not is_valid: invalid += 1 continue domain = result # 使用标准化后的域名 if domain in all_existing: skipped += 1 else: current.add(domain) all_existing.add(domain) added += 1 # 只保存通过 Bot 添加的域名到文件 save_domains_to_file(sorted(current)) return added, skipped, invalid, len(all_existing) def remove_email_domain(domain: str) -> bool: """删除指定域名 (只能删除通过 Bot 添加的域名) Args: domain: 要删除的域名 Returns: bool: 是否删除成功 """ current = set(load_domains_from_file()) domain = domain.strip().lower() if not domain.startswith("@"): domain = "@" + domain if domain in current: current.remove(domain) save_domains_to_file(sorted(current)) return True return False def get_file_domains_count() -> int: """获取txt文件中的域名数量 (不包含config配置的)""" return len(load_domains_from_file()) def clear_email_domains() -> int: """清空域名列表 (只清空txt文件,保留config配置) Returns: int: 被清空的域名数量 """ count = len(load_domains_from_file()) if DOMAIN_FILE.exists(): DOMAIN_FILE.unlink() return count # ================= 固定配置 ================= TARGET_URL = "https://chatgpt.com" def generate_random_birthday(): """生成随机生日 (2000-2004年)""" year = random.randint(2000, 2004) month = random.randint(1, 12) # 根据月份确定天数 if month in [1, 3, 5, 7, 8, 10, 12]: max_day = 31 elif month in [4, 6, 9, 11]: max_day = 30 else: # 2月 max_day = 29 if year % 4 == 0 else 28 day = random.randint(1, max_day) return str(year), f"{month:02d}", f"{day:02d}" def _detect_page_language(page) -> str: """检测页面语言 Returns: str: 'zh' 中文, 'en' 英文 """ try: html = page.html # 检查中文关键词 chinese_keywords = ['确认', '继续', '登录', '注册', '验证', '邮箱', '密码', '姓名', '生日'] for keyword in chinese_keywords: if keyword in html: return 'zh' return 'en' except Exception: return 'en' # 默认英文 def _input_birthday_precise(page, birth_year: str, birth_month: str, birth_day: str) -> bool: """精确输入生日 (使用 data-type 选择器定位输入框) 中文界面: yyyy/mm/dd (年/月/日) 英文界面: mm/dd/yyyy (月/日/年) Args: page: 浏览器页面对象 birth_year: 年份 (如 "2000") birth_month: 月份 (如 "07") birth_day: 日期 (如 "15") Returns: bool: 是否成功 """ try: # 使用 data-type 属性精确定位三个输入框 year_input = page.ele('css:[data-type="year"]', timeout=5) month_input = page.ele('css:[data-type="month"]', timeout=3) day_input = page.ele('css:[data-type="day"]', timeout=3) if not all([year_input, month_input, day_input]): log_progress("[!] 未找到完整的生日输入框 (data-type),尝试备用方案...") return False # 检测页面语言 lang = _detect_page_language(page) log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (界面: {lang})") # 根据语言决定输入顺序 if lang == 'zh': # 中文: 年 -> 月 -> 日 input_order = [(year_input, birth_year), (month_input, birth_month), (day_input, birth_day)] else: # 英文: 月 -> 日 -> 年 input_order = [(month_input, birth_month), (day_input, birth_day), (year_input, birth_year)] # 按顺序输入 for input_elem, value in input_order: try: input_elem.click() time.sleep(0.15) input_elem.input(value, clear=True) time.sleep(0.2) except Exception as e: log_progress(f"[!] 生日字段输入异常: {e}") return False log_progress("[OK] 生日已输入 (精确模式)") return True except Exception as e: log_progress(f"[!] 精确生日输入失败: {e}") return False def _input_birthday_fallback(page, birth_year: str, birth_month: str, birth_day: str): """备用生日输入方式 (Tab 键切换 + 逐字符输入) Args: page: 浏览器页面对象 birth_year: 年份 birth_month: 月份 birth_day: 日期 """ log_progress(f"生日: {birth_year}-{birth_month}-{birth_day} (备用模式)") # 使用 Tab 键切换到生日字段 page.actions.key_down('Tab').key_up('Tab') time.sleep(0.3) # 逐字符输入 birth_str = birth_year + birth_month + birth_day for digit in birth_str: page.actions.type(digit) time.sleep(0.1) log_progress("[OK] 生日已输入 (备用模式)") # 地址格式: (街道, 邮编, 城市) SEPA_ADDRESSES = [ # 柏林 ("Alexanderplatz 1", "10178", "Berlin"), ("Unter den Linden 77", "10117", "Berlin"), ("Kurfürstendamm 21", "10719", "Berlin"), ("Friedrichstraße 43", "10117", "Berlin"), ("Potsdamer Platz 1", "10785", "Berlin"), ("Tauentzienstraße 9", "10789", "Berlin"), # 慕尼黑 ("Marienplatz 8", "80331", "München"), ("Leopoldstraße 32", "80802", "München"), ("Maximilianstraße 17", "80539", "München"), ("Kaufingerstraße 28", "80331", "München"), ("Sendlinger Straße 3", "80331", "München"), # 汉堡 ("Mönckebergstraße 16", "20095", "Hamburg"), ("Jungfernstieg 38", "20354", "Hamburg"), ("Spitalerstraße 12", "20095", "Hamburg"), ("Neuer Wall 50", "20354", "Hamburg"), # 法兰克福 ("Zeil 106", "60313", "Frankfurt am Main"), ("Kaiserstraße 62", "60329", "Frankfurt am Main"), ("Goethestraße 1", "60313", "Frankfurt am Main"), ("Große Bockenheimer Str. 2", "60313", "Frankfurt am Main"), # 科隆 ("Hohe Straße 111", "50667", "Köln"), ("Schildergasse 24", "50667", "Köln"), ("Breite Straße 80", "50667", "Köln"), # 斯图加特 ("Königstraße 2", "70173", "Stuttgart"), ("Calwer Straße 19", "70173", "Stuttgart"), ("Schulstraße 5", "70173", "Stuttgart"), # 杜塞尔多夫 ("Königsallee 60", "40212", "Düsseldorf"), ("Schadowstraße 11", "40212", "Düsseldorf"), ("Flinger Straße 36", "40213", "Düsseldorf"), # 莱比锡 ("Grimmaische Straße 25", "04109", "Leipzig"), ("Petersstraße 36", "04109", "Leipzig"), # 德累斯顿 ("Prager Straße 12", "01069", "Dresden"), ("Altmarkt 10", "01067", "Dresden"), # 纽伦堡 ("Karolinenstraße 12", "90402", "Nürnberg"), ("Breite Gasse 23", "90402", "Nürnberg"), # 汉诺威 ("Georgstraße 10", "30159", "Hannover"), ("Bahnhofstraße 5", "30159", "Hannover"), # 不来梅 ("Obernstraße 2", "28195", "Bremen"), ("Sögestraße 18", "28195", "Bremen"), ] FIRST_NAMES = [ # 德国常见男性名 "Lukas", "Leon", "Maximilian", "Felix", "Paul", "Jonas", "Tim", "David", "Niklas", "Jan", "Philipp", "Moritz", "Alexander", "Sebastian", "Florian", "Julian", "Tobias", "Simon", "Daniel", "Christian", "Markus", "Thomas", "Michael", "Stefan", "Andreas", "Martin", "Matthias", "Benjamin", "Patrick", # 德国常见女性名 "Anna", "Laura", "Julia", "Lena", "Sarah", "Lisa", "Marie", "Sophie", "Katharina", "Hannah", "Emma", "Mia", "Lea", "Johanna", "Clara", "Charlotte", "Emilia", "Luisa", "Nina", "Elena", "Melanie", "Christina", "Sandra", "Nicole", "Sabine", "Claudia", "Petra", "Monika", "Stefanie", ] LAST_NAMES = [ # 德国最常见姓氏 "Müller", "Schmidt", "Schneider", "Fischer", "Weber", "Meyer", "Wagner", "Becker", "Schulz", "Hoffmann", "Schäfer", "Koch", "Bauer", "Richter", "Klein", "Wolf", "Schröder", "Neumann", "Schwarz", "Zimmermann", "Braun", "Krüger", "Hofmann", "Hartmann", "Lange", "Schmitt", "Werner", "Schmitz", "Krause", "Meier", "Lehmann", "Schmid", "Schulze", "Maier", "Köhler", "Herrmann", "König", "Walter", "Mayer", "Huber", "Kaiser", "Fuchs", "Peters", "Lang", "Scholz", "Möller", "Weiß", "Jung", "Hahn", "Schubert", ] # ================= 工具函数 ================= # 日志去重缓存 _last_log_message = "" _last_log_time = 0 def cleanup_chrome_processes(): """清理残留的 Chrome 进程 (跨平台支持)""" global _last_log_message, _last_log_time try: if platform.system() == "Windows": # Windows: 使用 taskkill 清理 chromedriver 和 chrome try: subprocess.run( ['taskkill', '/F', '/IM', 'chromedriver.exe'], capture_output=True, timeout=5 ) except Exception: pass # 清理无头模式的 chrome 进程 (带 --headless 参数的) try: result = subprocess.run( ['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'], capture_output=True, text=True, timeout=5 ) for line in result.stdout.strip().split('\n'): pid = line.strip() if pid.isdigit(): subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5) except Exception: pass else: # Linux/Mac: 使用 pkill try: subprocess.run( ['pkill', '-f', 'chromedriver'], capture_output=True, timeout=5 ) except Exception: pass # 清理无头模式的 chrome 进程 try: subprocess.run( ['pkill', '-f', 'chrome.*--headless'], capture_output=True, timeout=5 ) except Exception: pass # 避免重复日志:1秒内相同消息不重复输出 current_time = time.time() log_msg = "已清理 Chrome 残留进程" if log_msg != _last_log_message or current_time - _last_log_time > 1: log_status("清理", log_msg) _last_log_message = log_msg _last_log_time = current_time except Exception: pass # 静默处理,不影响主流程 def log_status(step, message): timestamp = time.strftime("%H:%M:%S") print(f"[{timestamp}] [{step}] {message}") sys.stdout.flush() def log_progress(message): print(f" {message}") sys.stdout.flush() def save_account(email, password, token, account_id=""): """保存账号信息到 JSON 文件""" import json accounts_file = "accounts.json" # 读取现有账号 accounts = [] try: with open(accounts_file, 'r', encoding='utf-8') as f: accounts = json.load(f) except: pass # 添加新账号 account_data = { "account": email, "password": password, "token": token } if account_id: account_data["account_id"] = account_id accounts.append(account_data) # 保存 with open(accounts_file, 'w', encoding='utf-8') as f: json.dump(accounts, f, ensure_ascii=False, indent=2) log_status("保存", f"账号已保存到 {accounts_file}") def get_verification_content(target_email, max_retries=90): log_status("API监听", f"正在监听邮件 ({MAIL_API_PATH})...") headers = { "Authorization": MAIL_API_TOKEN, "Content-Type": "application/json" } start_time = time.time() for i in range(max_retries): elapsed = int(time.time() - start_time) try: url = f"{MAIL_API_BASE}{MAIL_API_PATH}" payload = { "toEmail": target_email, "timeSort": "desc", "size": 20 } resp = requests.post(url, headers=headers, json=payload, timeout=10) if resp.status_code == 200: data = resp.json() if data.get('code') == 200: mails = data.get('data', []) if mails: for mail in mails: log_status("捕获", "✅ 成功抓取到目标邮件!") html_body = mail.get('content') or mail.get('text') or str(mail) # 提取验证码 code_match = re.search(r'\b(\d{6})\b', html_body) if code_match: code = code_match.group(1) log_status("解析", f"提取到验证码: {code}") return {"type": "code", "val": code} # 提取链接 link_match = re.search(r'href="(https://.*openai\.com/.*verification.*)"', html_body) if not link_match: link_match = re.search(r'href="(https://auth0\.openai\.com/u/login/identifier\?state=[^"]+)"', html_body) if link_match: link = link_match.group(1) log_status("解析", f"提取到链接: {link[:30]}...") return {"type": "link", "val": link} except: pass if i % 5 == 0: print(f" [监听中] 已耗时 {elapsed}秒...") sys.stdout.flush() time.sleep(2) log_status("超时", "[X] 未能获取验证码。") return None def _is_shutdown_requested(): """检查是否收到停止请求""" try: import run return run._shutdown_requested except Exception: return False def _is_connection_lost(error_msg): """检查是否是连接断开错误(通常由 /stop 命令导致)""" disconnect_keywords = [ "connection to the page has been disconnected", "page has been closed", "target closed", "session closed", "browser has disconnected", "no such window", "invalid session id" ] error_lower = str(error_msg).lower() return any(kw in error_lower for kw in disconnect_keywords) def _fill_payment_form(page, email, sepa_iban, street, postal_code, city, account_name, step_callback=None): """填写支付表单(内部函数,供重试使用) Returns: tuple: (success: bool, error_type: str, error_msg: str) error_type: "stopped", "iban_error", "fatal", "success" """ def step_cb(step): if step_callback: step_callback(step) try: # ========== 步骤 1: 填写邮箱 ========== step_cb("填写支付邮箱...") log_progress("[1] 填写邮箱...") if _is_shutdown_requested(): return False, "stopped", "用户停止" email_input = page.ele('#email', timeout=10) if not email_input: email_input = page.ele('@name=email', timeout=5) if not email_input: email_input = page.ele('css:input[type="email"]', timeout=5) if not email_input: return False, "fatal", "邮箱输入框未找到" email_input.clear() email_input.input(email) log_progress(f"[OK] 已填写邮箱: {email}") time.sleep(1) # ========== 步骤 2: 选择 SEPA ========== step_cb("选择 SEPA 支付方式...") log_progress("[2] 选择 SEPA...") time.sleep(1) sepa_clicked = False sepa_selectors = [ '@data-testid=sepa_debit-accordion-item-button', 'css:button[data-testid*="sepa"]', ] for selector in sepa_selectors: try: sepa_btn = page.ele(selector, timeout=2) if sepa_btn: page.run_js('arguments[0].click()', sepa_btn) sepa_clicked = True time.sleep(1) break except: continue if not sepa_clicked: try: result = page.run_js(''' const btns = document.querySelectorAll('button'); for(let btn of btns) { if(btn.innerText.includes('SEPA')) { btn.click(); return true; } } return false; ''') if result: sepa_clicked = True time.sleep(1) except: pass if not sepa_clicked: return False, "fatal", "SEPA 选择失败" # 验证 SEPA 是否展开 try: iban_check = page.ele('#iban', timeout=5) if not iban_check: return False, "fatal", "SEPA 未展开" except: return False, "fatal", "SEPA 未展开" # ========== 步骤 3: 填写 IBAN ========== step_cb("填写 IBAN...") log_progress("[3] 填写 IBAN...") try: iban_input = page.ele('#iban', timeout=5) if not iban_input: return False, "fatal", "IBAN 输入框未找到" iban_input.input(sepa_iban) except Exception as e: return False, "fatal", f"IBAN 填写失败: {e}" # ========== 步骤 4: 填写账户姓名 ========== step_cb("填写账户姓名...") log_progress("[4] 填写姓名...") try: name_input = page.ele('#billingName', timeout=3) if not name_input: name_input = page.ele('@name=billingName', timeout=2) if name_input: name_input.input(account_name) except: pass # ========== 步骤 5: 填写地址 ========== step_cb("填写账单地址...") log_progress("[5] 填写地址...") try: addr_input = page.ele('#billingAddressLine1', timeout=1) if not addr_input: try: manual_btn = page.ele('@data-testid=manual-address-entry', timeout=1) if manual_btn: manual_btn.click() time.sleep(0.3) except: pass addr_input = page.ele('#billingAddressLine1', timeout=2) if addr_input: addr_input.input(street) postal_input = page.ele('#billingPostalCode', timeout=1) if postal_input: postal_input.input(postal_code) city_input = page.ele('#billingLocality', timeout=1) if city_input: city_input.input(city) page.actions.key_down('Escape').key_up('Escape') except: pass # ========== 步骤 6: 勾选条款 ========== step_cb("勾选服务条款...") log_progress("[6] 勾选条款...") try: terms_checkbox = page.ele('#termsOfServiceConsentCheckbox', timeout=3) if terms_checkbox: terms_checkbox.click() except: pass # ========== 步骤 7: 点击订阅 ========== step_cb("提交订阅...") log_progress("[7] 点击订阅...") time.sleep(1) subscribe_clicked = False # 方式1: DrissionPage click try: subscribe_btn = page.ele('css:button[type="submit"]', timeout=5) if subscribe_btn: page.run_js('arguments[0].scrollIntoView({block: "center"})', subscribe_btn) time.sleep(0.5) subscribe_btn.click() subscribe_clicked = True log_progress("[OK] 点击订阅按钮 (方式1)") except Exception as e: log_progress(f"[!] 方式1点击失败: {e}") # 方式2: JS click if not subscribe_clicked: try: result = page.run_js(''' const btn = document.querySelector('button[type="submit"]'); if (btn) { btn.scrollIntoView({block: "center"}); btn.click(); return "clicked"; } return "not found"; ''') if result == "clicked": subscribe_clicked = True log_progress("[OK] 点击订阅按钮 (方式2-JS)") except Exception as e: log_progress(f"[!] 方式2点击失败: {e}") # 方式3: dispatchEvent if not subscribe_clicked: try: result = page.run_js(''' const btn = document.querySelector('button[type="submit"]'); if (btn) { const event = new MouseEvent('click', { bubbles: true, cancelable: true, view: window }); btn.dispatchEvent(event); return "dispatched"; } return "not found"; ''') if result == "dispatched": subscribe_clicked = True log_progress("[OK] 点击订阅按钮 (方式3-dispatchEvent)") except Exception as e: log_progress(f"[!] 方式3点击失败: {e}") return True, "success", "" except Exception as e: error_msg = str(e) if _is_connection_lost(error_msg): return False, "stopped", "浏览器连接断开" return False, "fatal", str(e) def run_payment_flow(page, email, step_callback=None, max_retries: int = 3): """执行 SEPA 支付流程 - 支持 IBAN 重试 Args: page: 浏览器页面对象 email: 邮箱地址 step_callback: 步骤回调函数 (step: str) max_retries: IBAN 重试次数 Returns: dict: 成功时返回 {"token": ..., "account_id": ...} 被停止时返回 {"stopped": True} 失败时返回 None """ def step_cb(step): if step_callback: step_callback(step) # 检查停止请求 if _is_shutdown_requested(): log_progress("[!] 检测到停止请求,中断支付流程") return {"stopped": True} log_status("支付流程", "开始处理 Stripe 支付页...") try: # 等待支付页加载完成 step_cb("等待支付页加载...") log_status("支付页", "等待支付页加载...") time.sleep(3) # 检查 IBAN 列表 ibans = get_sepa_ibans() if not ibans: log_progress("[X] 没有可用的 IBAN,请先通过 Bot 导入") return None # 用于跟踪已使用的 IBAN,避免重复 used_ibans = set() last_error = "" for retry in range(max_retries): if _is_shutdown_requested(): return {"stopped": True} # 选择一个未使用过的 IBAN available_ibans = [iban for iban in ibans if iban not in used_ibans] if not available_ibans: # 所有 IBAN 都用过了,重置 available_ibans = ibans used_ibans.clear() sepa_iban = random.choice(available_ibans) used_ibans.add(sepa_iban) street, postal_code, city = random.choice(SEPA_ADDRESSES) account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" if retry == 0: log_progress(f"使用 IBAN: {sepa_iban[:8]}...") log_progress(f"使用地址: {street}, {postal_code} {city}") log_progress(f"账户名: {account_name}") else: log_progress(f"[!] 重试 {retry}/{max_retries-1},更换 IBAN: {sepa_iban[:8]}...") # 如果是重试,需要刷新页面重新填写 if retry > 0: log_progress("[!] 刷新页面重新填写...") try: page.refresh() time.sleep(3) except: pass # 填写表单 success, error_type, error_msg = _fill_payment_form( page, email, sepa_iban, street, postal_code, city, account_name, step_callback ) if error_type == "stopped": return {"stopped": True} if not success: last_error = error_msg if error_type == "fatal": # 致命错误,不重试 log_progress(f"[X] 致命错误: {error_msg}") return None # IBAN 相关错误,继续重试 log_progress(f"[!] 表单填写失败: {error_msg}") continue # 表单填写成功,等待支付结果 step_cb("等待支付处理...") log_status("等待", "等待支付处理(超时90秒)...") payment_success = False start_time = time.time() max_wait = 90 last_log_time = 0 last_url = "" while time.time() - start_time < max_wait: try: current_url = page.url if current_url != last_url: log_progress(f"[URL变化] {current_url[:80]}...") last_url = current_url if 'payments/success' in current_url or 'success-team' in current_url: payment_success = True log_status("成功", "[OK] 支付成功!") break if 'error' in current_url.lower() or 'failed' in current_url.lower(): log_status("失败", f"[X] 支付失败: {current_url}") last_error = "支付页面显示错误" break # 检查页面错误提示 try: error_elem = page.ele('css:[class*="error"], [class*="Error"], [role="alert"]', timeout=0.5) if error_elem and error_elem.text: error_text = error_elem.text[:100] if error_text and ('error' in error_text.lower() or 'invalid' in error_text.lower() or 'unusable' in error_text.lower()): log_progress(f"[!] 页面错误: {error_text}") # IBAN 相关错误,标记重试 if 'iban' in error_text.lower() or 'bank' in error_text.lower() or 'unusable' in error_text.lower(): last_error = f"IBAN 错误: {error_text}" break except: pass except Exception as e: log_progress(f"[!] 页面访问异常: {str(e)[:50]}") if _is_shutdown_requested(): return {"stopped": True} elapsed = int(time.time() - start_time) if elapsed >= last_log_time + 10: log_progress(f"[等待中] 已等待 {elapsed} 秒...") last_log_time = elapsed if elapsed % 30 == 0: try: if 'pay.openai.com' in current_url or 'checkout.stripe.com' in current_url: log_progress("[!] 仍在支付页,尝试重新点击提交") page.run_js('document.querySelector("button[type=submit]")?.click()') except: pass time.sleep(2) if payment_success: # 支付成功,获取 token step_cb("获取 Token...") log_status("获取", "正在获取 access token...") time.sleep(1) page.get("https://chatgpt.com/api/auth/session") time.sleep(1) try: session_text = page.ele('tag:pre', timeout=5).text import json session_data = json.loads(session_text) access_token = session_data.get('accessToken', '') if access_token: log_status("成功", f"Token: {access_token[:50]}...") step_cb("获取 Account ID...") account_id = fetch_account_id_from_session(session_data) if not account_id: account_id = fetch_account_id(page, access_token) if account_id: log_progress(f"account_id: {account_id}") return { "token": access_token, "account_id": account_id } else: log_progress("[X] 未找到 accessToken") return None except Exception as e: log_progress(f"[X] 获取 token 失败: {e}") return None # 支付未成功,检查是否需要重试 if retry < max_retries - 1: log_progress(f"[!] 支付未成功: {last_error},准备重试...") continue # 所有重试都失败 log_status("超时", f"[X] 支付失败(已重试 {max_retries} 次): {last_error}") return None except Exception as e: error_msg = str(e) if _is_connection_lost(error_msg): log_status("停止", "[!] 浏览器连接断开,支付流程已中断") return {"stopped": True} log_status("错误", f"[X] 支付流程异常: {e}") return None def api_pay_with_session(reg, email: str, proxy: str = None, step_callback=None, max_retries: int = 3): """使用纯 API 完成支付流程(带 IBAN 重试) Args: reg: ChatGPTAPIRegister 对象 email: 邮箱 proxy: 代理地址 step_callback: 步骤回调 max_retries: IBAN 重试次数 Returns: dict: {"token": ..., "account_id": ...} 或 None """ def step_cb(step): if step_callback: step_callback(step) if not STRIPE_API_AVAILABLE: log_status("警告", "Stripe API 模式不可用,回退到浏览器模式") return None step_cb("获取支付页 URL...") # 通过 API 获取支付页 URL checkout_url = reg.get_checkout_url() if not checkout_url: log_status("失败", "无法获取支付页 URL") return None log_progress(f"[OK] 支付页: {checkout_url[:60]}...") # 使用 API 支付(带 IBAN 重试) step_cb("API 支付处理中...") def get_iban(): ibans = get_sepa_ibans() return random.choice(ibans) if ibans else "" def get_address(): return random.choice(SEPA_ADDRESSES) def get_name(): return f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" success, error = api_payment_with_retry( checkout_url=checkout_url, email=email, session=reg.session if hasattr(reg, 'session') else None, proxy=proxy, max_retries=max_retries, get_iban_func=get_iban, get_address_func=get_address, get_name_func=get_name, progress_callback=lambda msg: log_progress(msg) ) if not success: log_status("失败", f"API 支付失败: {error}") return None # 支付成功,获取 token 和 account_id step_cb("获取 Token...") log_status("获取", "正在获取 access token...") token = reg.get_session_token() if not token: log_status("警告", "支付成功但无法获取 token") return None log_status("成功", f"Token: {token[:50]}...") # 获取 account_id step_cb("获取 Account ID...") account_id = "" try: headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } resp = requests.get( "https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27", headers=headers, timeout=10 ) if resp.status_code == 200: data = resp.json() accounts = data.get("accounts", {}) for acc_id, acc_info in accounts.items(): if acc_id == "default": continue account_data = acc_info.get("account", {}) plan_type = account_data.get("plan_type", "") if "team" in plan_type.lower(): account_id = acc_id break if not account_id: for acc_id in accounts.keys(): if acc_id != "default": account_id = acc_id break except Exception as e: log_progress(f"获取 account_id 失败: {e}") if account_id: log_progress(f"account_id: {account_id[:8]}...") return { "token": token, "account_id": account_id } def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool = True, step_callback=None, max_retries: int = 3): """使用 API 完成支付流程(纯 API 模式,不使用浏览器) Args: reg: ChatGPTAPIRegister 对象 email: 邮箱 proxy: 代理地址 headless: 已废弃,保留兼容性 step_callback: 步骤回调 max_retries: IBAN 重试次数 Returns: dict: {"token": ..., "account_id": ...} 支付失败返回 {"payment_failed": True} 用户停止返回 {"stopped": True} """ def step_cb(step): if step_callback: step_callback(step) # 使用 API 支付模式 if STRIPE_API_AVAILABLE: log_status("支付", "使用 API 支付模式...") result = api_pay_with_session(reg, email, proxy, step_callback, max_retries=max_retries) if result: return result # API 支付失败,返回特殊标记让调用方重新创建用户 log_status("失败", "API 支付失败,需要重新创建用户") return {"payment_failed": True} else: log_status("错误", "Stripe API 模式不可用,请安装 curl_cffi") return {"payment_failed": True} def fetch_account_id(page, access_token: str) -> str: """通过 API 获取 account_id (使用 requests 直接请求,更快)""" log_status("获取", "正在获取 account_id...") try: # 直接使用 requests 请求 API,比 page.get() + JS fetch 快得多 headers = { "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" } resp = requests.get( "https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27", headers=headers, timeout=10 ) if resp.status_code == 200: data = resp.json() accounts = data.get("accounts", {}) # 优先查找 Team 账户 for acc_id, acc_info in accounts.items(): if acc_id == "default": continue account_data = acc_info.get("account", {}) plan_type = account_data.get("plan_type", "") if "team" in plan_type.lower(): log_status("成功", f"获取到 account_id: {acc_id[:8]}...") return acc_id # 取第一个非 default 的 for acc_id in accounts.keys(): if acc_id != "default": log_status("成功", f"获取到 account_id: {acc_id[:8]}...") return acc_id else: log_progress(f"API 请求失败: {resp.status_code}") except Exception as e: log_progress(f"获取 account_id 失败: {e}") return "" def fetch_account_id_from_session(session_data: dict) -> str: """直接从 session 数据中提取 account_id (最快方式)""" try: account = session_data.get("account", {}) account_id = account.get("id", "") if account_id: log_status("成功", f"获取到 account_id: {account_id[:8]}...") return account_id except Exception as e: log_progress(f"从 session 提取 account_id 失败: {e}") return "" def run_main_process(): # 检查必要配置 if not MAIL_API_TOKEN or not MAIL_API_BASE or not EMAIL_DOMAINS: print("\n" + "="*60) print("[X] 配置错误: 请在 config.toml 中配置 [autogptplus] 段") print(" - mail_api_token: Cloud Mail API Token") print(" - mail_api_base: Cloud Mail API 地址") print(" - email_domains: 可用邮箱域名列表") print("="*60 + "\n") return # 清理可能残留的 Chrome 调试进程 cleanup_chrome_processes() # === 1. 生成 15 位随机账号 + 同密码 === random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) domains = get_email_domains() if not domains: print("\n" + "="*60) print("[X] 配置错误: 没有可用的邮箱域名") print(" 请在 config.toml 中配置 email_domains 或通过 Bot 添加") print("="*60 + "\n") return email_domain = random.choice(domains) email = f"{random_str}{email_domain}" # 生成符合要求的密码:大小写字母+数字+特殊字符,至少12位 password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \ ''.join(random.choices(string.ascii_lowercase, k=8)) + \ ''.join(random.choices(string.digits, k=2)) + \ random.choice('!@#$%') real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" print("\n" + "="*60) log_status("初始化", f"生成账号: {email}") log_status("初始化", f"设置密码: {password}") print("="*60 + "\n") # 检测操作系统 is_linux = platform.system() == "Linux" # 获取随机指纹 fingerprint = None if RANDOM_FINGERPRINT: fingerprint = get_random_fingerprint() log_status("指纹", f"{fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}") else: fingerprint = { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} } log_status("指纹", "使用默认指纹 (RTX 3060)") # 配置 DrissionPage - 与项目 browser_automation.py 保持一致 co = ChromiumOptions() co.set_argument('--no-first-run') # 跳过首次运行 co.set_argument('--disable-infobars') co.set_argument('--incognito') # 无痕模式 co.set_argument('--disable-gpu') # 减少资源占用 co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题 co.set_argument('--no-sandbox') # 服务器环境需要 co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征 co.set_argument('--lang=zh-CN') # 设置语言为中文简体 co.set_argument(f'--user-agent={fingerprint["user_agent"]}') # 设置 User-Agent # Linux 服务器特殊配置 if is_linux: co.set_argument('--disable-software-rasterizer') co.set_argument('--disable-extensions') co.set_argument('--disable-setuid-sandbox') co.set_argument('--single-process') # 某些 Linux 环境需要 co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口 # 尝试查找 Chrome/Chromium 路径 chrome_paths = [ '/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/chromium-browser', '/usr/bin/chromium', '/snap/bin/chromium', ] for chrome_path in chrome_paths: if os.path.exists(chrome_path): co.set_browser_path(chrome_path) log_status("浏览器", f"使用浏览器: {chrome_path}") break else: co.auto_port(True) # Windows 使用自动分配端口 co.set_local_port(random.randint(19222, 29999)) # 备用:手动设置随机端口 # 无头模式 (服务器运行) screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) co.set_argument('--headless=new') co.set_argument(f'--window-size={screen["width"]},{screen["height"]}') log_status("浏览器", f"正在启动浏览器 (无头模式, {'Linux' if is_linux else 'Windows'})...") try: page = ChromiumPage(co) # 注入指纹伪装 if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) except Exception as e: log_status("浏览器", f"首次启动失败: {e},尝试清理后重试...") # 清理残留进程后重试 cleanup_chrome_processes() time.sleep(2) # 重新配置 co2 = ChromiumOptions() co2.set_argument('--no-first-run') co2.set_argument('--disable-infobars') co2.set_argument('--incognito') co2.set_argument('--disable-gpu') co2.set_argument('--disable-dev-shm-usage') co2.set_argument('--no-sandbox') co2.set_argument('--disable-blink-features=AutomationControlled') co2.set_argument('--lang=zh-CN') co2.set_argument(f'--user-agent={fingerprint["user_agent"]}') co2.set_argument('--headless=new') co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}') if is_linux: co2.set_argument('--disable-software-rasterizer') co2.set_argument('--disable-extensions') co2.set_argument('--disable-setuid-sandbox') co2.set_argument('--single-process') co2.set_argument('--remote-debugging-port=0') for chrome_path in chrome_paths: if os.path.exists(chrome_path): co2.set_browser_path(chrome_path) break else: co2.set_local_port(random.randint(30000, 39999)) page = ChromiumPage(co2) # 注入指纹伪装 if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) try: # === 注册流程 === log_status("步骤 1/5", "打开 ChatGPT 注册页...") page.get(TARGET_URL) log_progress(f"当前URL: {page.url}") # 使用 data-testid 定位登录按钮 login_btn = page.ele('@data-testid=login-button', timeout=30) if not login_btn: login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10) login_btn.click() time.sleep(2) log_progress(f"当前URL: {page.url}") log_progress("填入邮箱...") email_input = page.ele('@name=email', timeout=30) email_input.input(email) page.ele('xpath://button[@type="submit"]').click() time.sleep(2) log_progress(f"当前URL: {page.url}") log_progress("填入密码...") password_input = page.ele('xpath://input[@type="password"]', timeout=30) password_input.input(password) page.ele('xpath://button[@type="submit"]').click() time.sleep(2) log_progress(f"当前URL: {page.url}") log_status("步骤 2/5", "等待邮件 (All Mail)...") # === 接码与验证 === verify_data = get_verification_content(email) if verify_data: log_status("步骤 3/5", "执行验证...") if verify_data['type'] == 'link': page.new_tab(verify_data['val']) time.sleep(5) log_progress(f"当前URL: {page.url}") elif verify_data['type'] == 'code': code = verify_data['val'] log_progress(f"填入验证码: {code}") try: code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15) code_input.input(code) # 点击继续(使用 type=submit) time.sleep(1) try: log_progress("尝试点击继续按钮...") continue_btn = page.ele('css:button[type="submit"]', timeout=5) if continue_btn: continue_btn.click() except: log_progress("未找到按钮或已自动跳转...") time.sleep(2) log_progress(f"当前URL: {page.url}") except Exception as e: log_progress(f"验证码填入异常: {e}") # === 资料填写 (姓名+生日) === log_status("步骤 4/5", "进入信息填写页...") log_progress(f"当前URL: {page.url}") try: name_input = page.ele('@name=name', timeout=20) name_input.input(real_name) log_progress(f"姓名: {real_name}") # 生成随机生日 birth_year, birth_month, birth_day = generate_random_birthday() # 优先使用精确输入,失败则使用备用方案 if not _input_birthday_precise(page, birth_year, birth_month, birth_day): _input_birthday_fallback(page, birth_year, birth_month, birth_day) time.sleep(1.5) # 点击继续/完成注册(使用 type=submit) final_reg_btn = page.ele('css:button[type="submit"]', timeout=20) final_reg_btn.click() time.sleep(2) log_progress(f"当前URL: {page.url}") # === 检测提交后是否有错误或需要额外操作 === submit_success = False for check_attempt in range(10): current_url = page.url # 检查是否已跳转出 about-you 页面 if 'about-you' not in current_url: log_progress(f"[OK] 页面已跳转: {current_url[:50]}...") submit_success = True break # 检查是否有错误提示 try: error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1) if error_elem and error_elem.text: log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}") except: pass # 检查是否有未勾选的 checkbox(如服务条款) try: unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1) if unchecked: log_progress("发现未勾选的选项,尝试勾选...") unchecked.click() time.sleep(0.5) # 重新点击提交 final_reg_btn = page.ele('css:button[type="submit"]', timeout=5) if final_reg_btn: final_reg_btn.click() time.sleep(2) except: pass # 检查是否有 CAPTCHA try: captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1) if captcha: log_progress("[!] 检测到 CAPTCHA,需要人工处理或等待...") except: pass time.sleep(1) if not submit_success: log_progress(f"[!] 提交后仍在 about-you 页面,当前URL: {page.url}") # ======================================================= # 【关键节点】等待进入主页后再执行 JS 跳转到支付页 # ======================================================= log_status("订阅", "等待进入 ChatGPT 主页...") # 快速检测循环(减少等待时间) entered_main = False for i in range(20): # 最多等待 10 秒 current_url = page.url # 更宽松的判断:只要不在 auth/login 页面就认为进入了主页 if 'chatgpt.com' in current_url: if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url: log_progress(f"[OK] 已进入主页: {current_url}") entered_main = True break time.sleep(0.5) # 减少等待间隔 if not entered_main: log_progress("[!] 等待主页超时,尝试继续...") log_progress(f"当前URL: {page.url}") # 尝试直接访问主页 page.get("https://chatgpt.com/") time.sleep(2) log_progress(f"跳转后URL: {page.url}") log_status("订阅", "执行 JS 跳转到支付页...") # 直接执行 JS 跳转到支付页(无需额外等待,JS 会自动获取 session) checkout_js = ''' (async function(){ try { const t = await(await fetch("/api/auth/session")).json(); if(!t.accessToken){ return "请先登录ChatGPT!"; } const p = { plan_name: "chatgptteamplan", team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 }, billing_details: { country: "DE", currency: "EUR" }, promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true }, checkout_ui_mode: "redirect" }; const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", { method: "POST", headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" }, body: JSON.stringify(p) }); const d = await r.json(); if(d.url){ window.location.href = d.url; return "success"; } else { return "提取失败:" + (d.detail || JSON.stringify(d)); } } catch(e) { return "发生错误:" + e; } })(); ''' result = page.run_js(checkout_js) log_progress(f"JS 执行结果: {result}") # 等待跳转到支付页(使用 URL 检测代替固定等待) try: page.wait.url_change('pay.openai.com', timeout=15) log_progress("[OK] 已跳转到支付页") log_progress(f"支付页URL: {page.url}") except: time.sleep(1) log_progress(f"当前URL: {page.url}") # 执行支付流程 result = run_payment_flow(page, email) # 保存账号信息 if result and result.get("token"): save_account( email, password, result["token"], result.get("account_id", "") ) except Exception as e: log_status("崩溃", f"注册/订阅转换阶段异常: {e}") else: log_status("失败", "未能找到验证码。") except Exception as e: log_status("崩溃", f"全局错误: {e}") print("\n" + "="*60) input("按回车键退出...") page.quit() def run_single_registration(progress_callback=None, step_callback=None) -> dict: """执行单次注册流程 (供 Bot 调用) Args: progress_callback: 进度回调函数 (message: str) - 用于日志 step_callback: 步骤回调函数 (step: str) - 用于更新 Bot 显示的当前步骤 Returns: dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str} """ def log_cb(msg): if progress_callback: progress_callback(msg) else: log_progress(msg) def step_cb(step): if step_callback: step_callback(step) # 检查必要配置 if not MAIL_API_TOKEN or not MAIL_API_BASE: return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"} # 检查域名 domains = get_email_domains() if not domains: return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"} # 检查 IBAN ibans = get_sepa_ibans() if not ibans: return {"success": False, "error": "没有可用的 IBAN,请先通过 /iban_add 导入"} step_cb("生成账号信息...") # 生成账号信息 random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) email_domain = random.choice(domains) email = f"{random_str}{email_domain}" password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \ ''.join(random.choices(string.ascii_lowercase, k=8)) + \ ''.join(random.choices(string.digits, k=2)) + \ random.choice('!@#$%') real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" log_status("初始化", f"生成账号: {email}") log_status("初始化", f"设置密码: {password}") # 检测操作系统 is_linux = platform.system() == "Linux" step_cb("启动浏览器...") # 获取随机指纹 fingerprint = None if RANDOM_FINGERPRINT: fingerprint = get_random_fingerprint() log_status("指纹", f"已注入: {fingerprint['webgl_renderer'][:40]}...") else: fingerprint = { "user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36", "platform": "Win32", "webgl_vendor": "Google Inc. (NVIDIA)", "webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)", "screen": {"width": 1920, "height": 1080} } # 配置浏览器 co = ChromiumOptions() co.set_argument('--no-first-run') co.set_argument('--disable-infobars') co.set_argument('--incognito') co.set_argument('--disable-gpu') co.set_argument('--disable-dev-shm-usage') co.set_argument('--no-sandbox') co.set_argument('--disable-blink-features=AutomationControlled') co.set_argument('--lang=zh-CN') co.set_argument(f'--user-agent={fingerprint["user_agent"]}') if is_linux: co.set_argument('--disable-software-rasterizer') co.set_argument('--disable-extensions') co.set_argument('--disable-setuid-sandbox') co.set_argument('--single-process') co.set_argument('--remote-debugging-port=0') chrome_paths = [ '/usr/bin/google-chrome', '/usr/bin/google-chrome-stable', '/usr/bin/chromium-browser', '/usr/bin/chromium', '/snap/bin/chromium', ] for chrome_path in chrome_paths: if os.path.exists(chrome_path): co.set_browser_path(chrome_path) break else: co.auto_port(True) co.set_local_port(random.randint(19222, 29999)) screen = fingerprint.get("screen", {"width": 1920, "height": 1080}) co.set_argument('--headless=new') co.set_argument(f'--window-size={screen["width"]},{screen["height"]}') page = None try: page = ChromiumPage(co) if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) except Exception as e: log_status("浏览器", f"启动失败: {e},尝试清理后重试...") cleanup_chrome_processes() time.sleep(2) # 重新配置 co2 = ChromiumOptions() co2.set_argument('--no-first-run') co2.set_argument('--disable-infobars') co2.set_argument('--incognito') co2.set_argument('--disable-gpu') co2.set_argument('--disable-dev-shm-usage') co2.set_argument('--no-sandbox') co2.set_argument('--disable-blink-features=AutomationControlled') co2.set_argument('--lang=zh-CN') co2.set_argument(f'--user-agent={fingerprint["user_agent"]}') co2.set_argument('--headless=new') co2.set_argument(f'--window-size={screen["width"]},{screen["height"]}') if is_linux: co2.set_argument('--disable-software-rasterizer') co2.set_argument('--disable-extensions') co2.set_argument('--disable-setuid-sandbox') co2.set_argument('--single-process') co2.set_argument('--remote-debugging-port=0') for chrome_path in chrome_paths: if os.path.exists(chrome_path): co2.set_browser_path(chrome_path) break else: co2.auto_port(True) co2.set_local_port(random.randint(30000, 39999)) try: page = ChromiumPage(co2) if RANDOM_FINGERPRINT: inject_fingerprint(page, fingerprint) except Exception as e2: return {"success": False, "error": f"浏览器启动失败: {e2}", "account": email, "password": password} try: step_cb("打开注册页面...") log_status("步骤 1/5", "打开 ChatGPT 注册页...") page.get(TARGET_URL) log_progress(f"当前URL: {page.url}") step_cb("点击登录按钮...") login_btn = page.ele('@data-testid=login-button', timeout=30) if not login_btn: login_btn = page.ele('css:button[data-testid*="login"], a[href*="auth"]', timeout=10) login_btn.click() time.sleep(2) log_progress(f"当前URL: {page.url}") step_cb("填写邮箱...") log_progress("填入邮箱...") email_input = page.ele('@name=email', timeout=30) email_input.input(email) page.ele('xpath://button[@type="submit"]').click() time.sleep(2) log_progress(f"当前URL: {page.url}") step_cb("填写密码...") log_progress("填入密码...") password_input = page.ele('xpath://input[@type="password"]', timeout=30) password_input.input(password) page.ele('xpath://button[@type="submit"]').click() time.sleep(2) log_progress(f"当前URL: {page.url}") step_cb("等待验证邮件...") log_status("步骤 2/5", "等待邮件 (All Mail)...") verify_data = get_verification_content(email) if not verify_data: return {"success": False, "error": "未能获取验证码", "account": email, "password": password} step_cb("执行邮箱验证...") log_status("步骤 3/5", "执行验证...") if verify_data['type'] == 'link': page.new_tab(verify_data['val']) time.sleep(5) log_progress(f"当前URL: {page.url}") elif verify_data['type'] == 'code': code = verify_data['val'] log_progress(f"填入验证码: {code}") code_input = page.ele('css:input[autocomplete="one-time-code"], input[name="code"], #code', timeout=15) code_input.input(code) time.sleep(1) try: log_progress("尝试点击继续按钮...") continue_btn = page.ele('css:button[type="submit"]', timeout=5) if continue_btn: continue_btn.click() except: log_progress("未找到按钮或已自动跳转...") time.sleep(2) log_progress(f"当前URL: {page.url}") # 资料填写 step_cb("填写个人信息...") log_status("步骤 4/5", "进入信息填写页...") log_progress(f"当前URL: {page.url}") name_input = page.ele('@name=name', timeout=20) name_input.input(real_name) log_progress(f"姓名: {real_name}") # 生成随机生日 birth_year, birth_month, birth_day = generate_random_birthday() # 优先使用精确输入,失败则使用备用方案 if not _input_birthday_precise(page, birth_year, birth_month, birth_day): _input_birthday_fallback(page, birth_year, birth_month, birth_day) time.sleep(1.5) final_reg_btn = page.ele('css:button[type="submit"]', timeout=20) final_reg_btn.click() time.sleep(2) log_progress(f"当前URL: {page.url}") # === 检测提交后是否有错误或需要额外操作 === submit_success = False for check_attempt in range(10): current_url = page.url # 检查是否已跳转出 about-you 页面 if 'about-you' not in current_url: log_progress(f"[OK] 页面已跳转: {current_url[:50]}...") submit_success = True break # 检查是否有错误提示 try: error_elem = page.ele('css:[role="alert"], [class*="error"], [class*="Error"]', timeout=1) if error_elem and error_elem.text: log_progress(f"[!] 发现错误提示: {error_elem.text[:50]}") except: pass # 检查是否有未勾选的 checkbox(如服务条款) try: unchecked = page.ele('css:input[type="checkbox"]:not(:checked)', timeout=1) if unchecked: log_progress("发现未勾选的选项,尝试勾选...") unchecked.click() time.sleep(0.5) # 重新点击提交 final_reg_btn = page.ele('css:button[type="submit"]', timeout=5) if final_reg_btn: final_reg_btn.click() time.sleep(2) except: pass # 检查是否有 CAPTCHA try: captcha = page.ele('css:iframe[src*="captcha"], iframe[src*="recaptcha"], [class*="captcha"]', timeout=1) if captcha: log_progress("[!] 检测到 CAPTCHA,需要人工处理或等待...") except: pass time.sleep(1) if not submit_success: log_progress(f"[!] 提交后仍在 about-you 页面,当前URL: {page.url}") # 等待进入主页 - 快速检测 step_cb("等待进入主页...") log_status("订阅", "等待进入 ChatGPT 主页...") # 快速检测循环(减少等待时间) entered_main = False for i in range(20): # 最多等待 10 秒 current_url = page.url # 更宽松的判断:只要不在 auth/login 页面就认为进入了主页 if 'chatgpt.com' in current_url: if '/auth' not in current_url and '/login' not in current_url and 'auth0' not in current_url: log_progress(f"[OK] 已进入主页: {current_url}") entered_main = True break time.sleep(0.5) # 减少等待间隔 if not entered_main: log_progress("[!] 等待主页超时,尝试继续...") log_progress(f"当前URL: {page.url}") # 尝试直接访问主页 page.get("https://chatgpt.com/") time.sleep(2) log_progress(f"跳转后URL: {page.url}") # 跳转到支付页 step_cb("跳转到支付页...") log_status("订阅", "执行 JS 跳转到支付页...") # 直接执行 JS 跳转到支付页(无需额外等待) checkout_js = ''' (async function(){ try { const t = await(await fetch("/api/auth/session")).json(); if(!t.accessToken){ return "请先登录ChatGPT!"; } const p = { plan_name: "chatgptteamplan", team_plan_data: { workspace_name: "Sepa", price_interval: "month", seat_quantity: 5 }, billing_details: { country: "DE", currency: "EUR" }, promo_campaign: { promo_campaign_id: "team-1-month-free", is_coupon_from_query_param: true }, checkout_ui_mode: "redirect" }; const r = await fetch("https://chatgpt.com/backend-api/payments/checkout", { method: "POST", headers: { Authorization: "Bearer " + t.accessToken, "Content-Type": "application/json" }, body: JSON.stringify(p) }); const d = await r.json(); if(d.url){ window.location.href = d.url; return "success"; } else { return "提取失败:" + (d.detail || JSON.stringify(d)); } } catch(e) { return "发生错误:" + e; } })(); ''' result = page.run_js(checkout_js) log_progress(f"JS 执行结果: {result}") # 等待跳转到支付页 try: page.wait.url_change('pay.openai.com', timeout=15) log_progress("[OK] 已跳转到支付页") log_progress(f"支付页URL: {page.url}") except: time.sleep(1) log_progress(f"当前URL: {page.url}") # 执行支付流程 step_cb("执行 SEPA 支付...") result = run_payment_flow(page, email, step_cb) if result and result.get("stopped"): # 被 /stop 命令中断 log_status("停止", "[!] 注册被用户停止") return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password} elif result and result.get("token"): step_cb("注册成功!") log_status("完成", "[OK] 注册成功!") return { "success": True, "account": email, "password": password, "token": result["token"], "account_id": result.get("account_id", "") } else: log_status("失败", "注册失败: 支付流程失败") return {"success": False, "error": "支付流程失败", "account": email, "password": password} except Exception as e: error_msg = str(e) # 只有连接断开才认为是停止请求 if _is_connection_lost(error_msg): log_status("停止", "[!] 浏览器连接断开") return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password} log_status("错误", f"注册异常: {e}") return {"success": False, "error": str(e), "account": email, "password": password} finally: if page: try: page.quit() except: pass cleanup_chrome_processes() def run_single_registration_api(progress_callback=None, step_callback=None, proxy: str = None, max_user_retries: int = 3) -> dict: """执行单次注册流程 - 协议模式 (API + API 支付) 支付失败时会自动重新创建用户重试,最多重试 max_user_retries 次 Args: progress_callback: 进度回调函数 (message: str) step_callback: 步骤回调函数 (step: str) proxy: 代理地址 max_user_retries: 支付失败后重新创建用户的最大次数 Returns: dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str} """ def log_cb(msg): if progress_callback: progress_callback(msg) else: log_progress(msg) def step_cb(step): if step_callback: step_callback(step) # 检查协议模式是否可用 if not API_MODE_AVAILABLE: return {"success": False, "error": "协议模式不可用,请安装 curl_cffi: pip install curl_cffi"} # 检查必要配置 if not MAIL_API_TOKEN or not MAIL_API_BASE: return {"success": False, "error": "配置错误: 请在 config.toml 中配置 [autogptplus] 段"} # 检查域名 domains = get_email_domains() if not domains: return {"success": False, "error": "没有可用的邮箱域名,请先通过 /domain_add 导入"} # 检查 IBAN ibans = get_sepa_ibans() if not ibans: return {"success": False, "error": "没有可用的 IBAN,请先通过 /iban_add 导入"} # 使用配置的代理或传入的代理 use_proxy = proxy or API_PROXY or None if use_proxy: log_status("代理", f"使用代理: {use_proxy}") last_error = "" for user_retry in range(max_user_retries): if _is_shutdown_requested(): return {"success": False, "error": "用户停止", "stopped": True} if user_retry > 0: log_status("重试", f"========== 重新创建用户 (第 {user_retry + 1}/{max_user_retries} 次) ==========") step_cb("生成账号信息...") # 生成账号信息 random_str = ''.join(random.choices(string.ascii_lowercase + string.digits, k=15)) email_domain = random.choice(domains) email = f"{random_str}{email_domain}" password = ''.join(random.choices(string.ascii_uppercase, k=2)) + \ ''.join(random.choices(string.ascii_lowercase, k=8)) + \ ''.join(random.choices(string.digits, k=2)) + \ random.choice('!@#$%') real_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}" # 生成生日 year = random.randint(2000, 2004) month = random.randint(1, 12) if month in [1, 3, 5, 7, 8, 10, 12]: max_day = 31 elif month in [4, 6, 9, 11]: max_day = 30 else: max_day = 29 if year % 4 == 0 else 28 day = random.randint(1, max_day) birthdate = f"{year}-{month:02d}-{day:02d}" log_status("初始化", f"生成账号: {email}") log_status("初始化", f"设置密码: {password}") log_status("初始化", f"姓名: {real_name} | 生日: {birthdate}") log_status("模式", "协议模式 (纯 API)") try: # 阶段 1: API 注册 step_cb("API 快速注册...") log_status("阶段 1", "========== API 快速注册 ==========") reg = api_register_flow( email=email, password=password, real_name=real_name, birthdate=birthdate, mail_api_base=MAIL_API_BASE, mail_api_token=MAIL_API_TOKEN, proxy=use_proxy, progress_callback=log_cb ) if not reg: log_status("失败", "API 注册失败") last_error = "API 注册失败" continue # 重新创建用户 log_status("完成", "[OK] API 注册成功!") # 阶段 2: API 支付 step_cb("API 支付...") log_status("阶段 2", "========== API 支付 ==========") result = browser_pay_with_cookies( reg=reg, email=email, proxy=use_proxy, step_callback=step_cb, max_retries=3 # IBAN 重试 3 次 ) if result and result.get("stopped"): log_status("停止", "[!] 注册被用户停止") return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password} if result and result.get("token"): step_cb("注册成功!") log_status("完成", "[OK] 全部流程完成!") return { "success": True, "account": email, "password": password, "token": result["token"], "account_id": result.get("account_id", "") } if result and result.get("payment_failed"): log_status("失败", "支付失败,准备重新创建用户...") last_error = "支付流程失败" continue # 重新创建用户 # 其他失败情况 log_status("失败", "注册成功但支付/获取token失败") last_error = "支付流程失败" continue # 重新创建用户 except ShutdownRequested: log_status("停止", "[!] 用户请求停止") return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password} except Exception as e: error_msg = str(e) if _is_connection_lost(error_msg): log_status("停止", "[!] 连接断开") return {"success": False, "error": "连接断开", "stopped": True, "account": email, "password": password} log_status("错误", f"注册异常: {e}") last_error = str(e) continue # 重新创建用户 finally: cleanup_chrome_processes() # 所有重试都失败 log_status("失败", f"已重试 {max_user_retries} 次,全部失败") return {"success": False, "error": f"重试 {max_user_retries} 次后仍失败: {last_error}"} def run_single_registration_auto(progress_callback=None, step_callback=None, mode: str = None) -> dict: """自动选择模式执行注册 Args: progress_callback: 进度回调 step_callback: 步骤回调 mode: 强制指定模式 ("browser" / "api"),None 则使用配置 Returns: dict: 注册结果 """ use_mode = mode or REGISTER_MODE if use_mode == "api": if not API_MODE_AVAILABLE: log_status("警告", "协议模式不可用,回退到浏览器模式") return run_single_registration(progress_callback, step_callback) return run_single_registration_api(progress_callback, step_callback) else: return run_single_registration(progress_callback, step_callback) def get_register_mode() -> str: """获取当前注册模式""" return REGISTER_MODE def set_register_mode(mode: str) -> bool: """设置注册模式 (运行时) Args: mode: "browser" 或 "api" Returns: bool: 是否设置成功 """ global REGISTER_MODE if mode in ("browser", "api"): if mode == "api" and not API_MODE_AVAILABLE: return False REGISTER_MODE = mode return True return False def is_api_mode_supported() -> bool: """检查协议模式是否支持""" return API_MODE_AVAILABLE if __name__ == "__main__": run_main_process()