# ==================== 浏览器自动化模块 ====================
# 处理 OpenAI 注册、Codex 授权等浏览器自动化操作
# 使用 DrissionPage 替代 Selenium
# 支持协议模式 (API) 和浏览器模式
import time
import random
import subprocess
import os
import platform
from contextlib import contextmanager
from DrissionPage import ChromiumPage, ChromiumOptions
from config import (
BROWSER_WAIT_TIMEOUT,
BROWSER_SHORT_WAIT,
BROWSER_HEADLESS,
BROWSER_RANDOM_FINGERPRINT,
AUTH_PROVIDER,
get_random_name,
get_random_birthday,
get_random_fingerprint
)
from email_service import unified_get_verification_code
from crs_service import crs_generate_auth_url, crs_exchange_code, crs_add_account, extract_code_from_url
from cpa_service import (
cpa_generate_auth_url,
cpa_submit_callback,
cpa_poll_auth_status,
is_cpa_callback_url
)
from s2a_service import (
s2a_generate_auth_url,
s2a_create_account_from_oauth
)
from logger import log
# 导入协议模式模块
try:
from api_register import (
api_register_account_only,
is_api_mode_available as _is_api_mode_available,
)
API_MODE_AVAILABLE = _is_api_mode_available()
except ImportError:
API_MODE_AVAILABLE = False
api_register_account_only = None
# 进度更新 (Telegram Bot 使用)
try:
from bot_notifier import progress_update
except ImportError:
def progress_update(account=None, phase=None, step=None, role=None): pass
# ==================== 停止检查 ====================
class ShutdownRequested(Exception):
"""停止请求异常 - 用于中断浏览器操作"""
pass
def check_shutdown():
"""检查是否收到停止请求,如果是则抛出异常"""
try:
import run
if run._shutdown_requested:
log.warning("检测到停止请求,中断当前操作...")
raise ShutdownRequested("用户请求停止")
except ImportError:
pass
except ShutdownRequested:
raise
except Exception:
pass
# ==================== 浏览器配置常量 ====================
BROWSER_MAX_RETRIES = 3 # 浏览器启动最大重试次数
BROWSER_RETRY_DELAY = 2 # 重试间隔 (秒)
PAGE_LOAD_TIMEOUT = 15 # 页面加载超时 (秒)
# ==================== 输入速度配置 (模拟真人) ====================
# 设置为 True 使用更安全的慢速模式,False 使用快速模式
SAFE_MODE = True
TYPING_DELAY = 0.12 if SAFE_MODE else 0.06 # 打字基础延迟
ACTION_DELAY = (1.0, 2.0) if SAFE_MODE else (0.3, 0.8) # 操作间隔范围
# ==================== URL 监听与日志 ====================
_last_logged_url = None # 记录上次日志的URL,避免重复
# ==================== 调试截图 ====================
DEBUG_SCREENSHOT = os.environ.get('DEBUG_SCREENSHOT', '').lower() == 'true'
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), 'debug_screenshots')
def save_debug_screenshot(page, name: str):
"""保存调试截图并发送到 Telegram (仅在 DEBUG_SCREENSHOT=true 时生效)
Args:
page: 浏览器页面对象
name: 截图名称 (不含扩展名)
"""
if not DEBUG_SCREENSHOT:
return
try:
if not os.path.exists(SCREENSHOT_DIR):
os.makedirs(SCREENSHOT_DIR)
timestamp = time.strftime('%Y%m%d_%H%M%S')
filepath = os.path.join(SCREENSHOT_DIR, f'{timestamp}_{name}.png')
page.get_screenshot(filepath)
log.info(f"[DEBUG] 截图已保存: {filepath}")
# 发送截图到 Telegram Bot
try:
from bot_notifier import send_screenshot_sync
caption = f"🔍 Debug Screenshot\n{name}"
send_screenshot_sync(filepath, caption)
except Exception:
pass # Bot 未启动时忽略
except Exception as e:
log.warning(f"截图保存失败: {e}")
def log_current_url(page, context: str = None, force: bool = False):
"""记录当前页面URL (完整地址)
Args:
page: 浏览器页面对象
context: 上下文描述 (如 "点击继续后", "输入邮箱后")
force: 是否强制记录 (即使URL未变化)
"""
global _last_logged_url
try:
current_url = page.url
# 只在URL变化时记录,除非强制记录
if force or current_url != _last_logged_url:
_last_logged_url = current_url
# 解析URL获取关键信息
url_info = _parse_url_info(current_url)
# 左对齐格式输出
if context:
if url_info:
log.info(f"[URL] {context} | {current_url} | {url_info}")
else:
log.info(f"[URL] {context} | {current_url}")
else:
if url_info:
log.info(f"[URL] {current_url} | {url_info}")
else:
log.info(f"[URL] {current_url}")
except Exception as e:
log.warning(f"获取URL失败: {e}")
def _parse_url_info(url: str) -> str:
"""解析URL,返回页面类型描述
Args:
url: 页面URL
Returns:
str: 页面类型描述
"""
if not url:
return ""
# OpenAI Auth 页面
if "auth.openai.com" in url:
if "/log-in-or-create-account" in url:
return "登录/注册选择页"
elif "/log-in/password" in url:
return "密码登录页"
elif "/create-account/password" in url:
return "创建账号密码页"
elif "/email-verification" in url:
return "邮箱验证码页"
elif "/about-you" in url:
return "个人信息填写页"
elif "/authorize" in url:
return "授权确认页"
elif "/callback" in url:
return "回调处理页"
else:
return "OpenAI 认证页"
# ChatGPT 页面
elif "chatgpt.com" in url:
if "/auth" in url:
return "ChatGPT 认证页"
else:
return "ChatGPT 主页"
# 回调页面
elif "localhost:1455" in url:
if "/auth/callback" in url:
return "本地授权回调页"
else:
return "本地服务页"
return ""
def log_url_change(page, old_url: str, action: str = None):
"""记录URL变化 (显示完整地址,左对齐)
Args:
page: 浏览器页面对象
old_url: 变化前的URL
action: 触发变化的操作描述
"""
global _last_logged_url
try:
new_url = page.url
if new_url != old_url:
_last_logged_url = new_url # 更新记录,避免重复日志
new_info = _parse_url_info(new_url)
# 左对齐格式: [URL] 操作 | 新地址 | 页面类型
if action:
if new_info:
log.info(f"[URL] {action} | {new_url} | {new_info}")
else:
log.info(f"[URL] {action} | {new_url}")
else:
if new_info:
log.info(f"[URL] 跳转 | {new_url} | {new_info}")
else:
log.info(f"[URL] 跳转 | {new_url}")
except Exception as e:
log.warning(f"记录URL变化失败: {e}")
def acquire_lock_with_keepalive(lock, page, timeout: float = 120, check_interval: float = 1.0) -> bool:
"""获取锁的同时保持浏览器活跃,防止等待期间浏览器连接断开
在并发模式下,多个 Worker 可能需要等待授权回调锁。
如果等待时间过长 (>15-30秒),浏览器连接可能会因为空闲而断开。
此函数通过定期访问页面来保持浏览器活跃。
Args:
lock: threading.Lock 对象
page: 浏览器页面对象
timeout: 最大等待时间 (秒)
check_interval: 检查间隔 (秒),默认 1 秒
Returns:
bool: 是否成功获取锁(且浏览器仍可用)
"""
import threading
start_time = time.time()
keepalive_counter = 0
while time.time() - start_time < timeout:
# 尝试非阻塞获取锁
acquired = lock.acquire(blocking=False)
if acquired:
# 获取锁成功,验证浏览器是否仍然可用
try:
_ = page.url # 测试连接
page.run_js("1+1") # 确保 JS 引擎可用
return True
except Exception as e:
# 浏览器已断开,释放锁让其他人重试
log.warning(f"获取锁后发现浏览器已断开: {e}")
lock.release()
return False
# 未获取到锁,保持浏览器活跃
try:
# 每次都访问页面属性以保持连接活跃
current_url = page.url
# 执行轻量级 JavaScript 保持活跃
page.run_js("1+1")
# 每 5 次(约 5 秒)执行一次更积极的保活操作
keepalive_counter += 1
if keepalive_counter % 5 == 0:
# 导航到 about:blank 再返回,强制保持连接
try:
page.run_js("window.focus()") # 尝试激活窗口
except Exception:
pass
except Exception as e:
# 浏览器连接已断开
log.warning(f"保持浏览器活跃时出错: {e}")
return False
# 等待一小段时间后重试(使用较短间隔)
time.sleep(check_interval)
# 超时,尝试最后一次阻塞获取 (短暂)
acquired = lock.acquire(blocking=True, timeout=0.1)
if acquired:
# 验证浏览器
try:
_ = page.url
return True
except Exception:
lock.release()
return False
return False
def cleanup_chrome_processes(force: bool = False):
"""清理残留的 Chrome 进程 (跨平台支持)
Args:
force: 是否强制清理。在并发模式下默认跳过,除非 force=True
"""
# 检查是否在并发模式下运行,如果是则跳过清理(避免杀死其他 Worker 的浏览器)
if not force:
try:
from config import CONCURRENT_ENABLED
if CONCURRENT_ENABLED:
# 并发模式下跳过全局清理,避免影响其他 Worker
return
except ImportError:
pass
try:
if platform.system() == "Windows":
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
try:
result = subprocess.run(
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
pid = line.strip()
if pid.isdigit():
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
else:
# Linux/Mac: 使用 pkill
try:
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程
try:
subprocess.run(
['pkill', '-f', 'chrome.*--headless'],
capture_output=True, timeout=5
)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
except Exception:
pass # 静默处理,不影响主流程
def _inject_fingerprint(page: ChromiumPage, fingerprint: dict):
"""注入浏览器指纹伪装脚本
Args:
page: 浏览器页面对象
fingerprint: 指纹配置字典
"""
try:
webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)")
webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)")
platform = fingerprint.get("platform", "Win32")
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
# 注入指纹伪装脚本 (使用 try-catch 避免属性不可重定义的错误)
js_script = f'''
(function() {{
// 伪装 WebGL 指纹
try {{
const getParameterProxyHandler = {{
apply: function(target, thisArg, args) {{
const param = args[0];
if (param === 37445) return "{webgl_vendor}";
if (param === 37446) return "{webgl_renderer}";
return Reflect.apply(target, thisArg, args);
}}
}};
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler);
if (typeof WebGL2RenderingContext !== 'undefined') {{
const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter;
WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler);
}}
}} catch(e) {{}}
// 伪装 platform
try {{
Object.defineProperty(navigator, 'platform', {{ get: () => "{platform}", configurable: true }});
}} catch(e) {{}}
// 伪装屏幕分辨率
try {{
Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]}, configurable: true }});
Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]}, configurable: true }});
}} catch(e) {{}}
// 隐藏 webdriver 特征
try {{
Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined, configurable: true }});
}} catch(e) {{}}
// 伪装 languages
try {{
Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"], configurable: true }});
}} catch(e) {{}}
// 伪装 plugins
try {{
Object.defineProperty(navigator, 'plugins', {{
get: () => [
{{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }},
{{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }},
{{ name: "Native Client", filename: "internal-nacl-plugin" }}
],
configurable: true
}});
}} catch(e) {{}}
}})();
'''
page.run_js(js_script)
except Exception as e:
log.warning(f"指纹注入失败: {e}")
def init_browser(max_retries: int = BROWSER_MAX_RETRIES) -> ChromiumPage:
"""初始化 DrissionPage 浏览器 (带重试机制)
Args:
max_retries: 最大重试次数
Returns:
ChromiumPage: 浏览器实例
"""
log.info("初始化浏览器...", icon="browser")
# 根据配置决定是否使用随机指纹
fingerprint = None
if BROWSER_RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log.info(f"指纹: {fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}", icon="config")
else:
# 使用默认指纹
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
log.info("指纹: 默认 (RTX 3060, 1920x1080)", icon="config")
last_error = None
is_linux = platform.system() == "Linux"
for attempt in range(max_retries):
try:
# 首次尝试或重试前清理残留进程
if attempt > 0:
log.warning(f"浏览器启动重试 ({attempt + 1}/{max_retries})...")
cleanup_chrome_processes()
time.sleep(BROWSER_RETRY_DELAY)
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--disable-infobars')
co.set_argument('--incognito') # 无痕模式
co.set_argument('--disable-gpu') # 减少资源占用
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
co.set_argument('--no-sandbox') # 服务器环境需要
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
# 并发模式:为每个实例创建独立的用户数据目录
import tempfile
temp_user_data = tempfile.mkdtemp(prefix="chrome_worker_")
co.set_argument(f'--user-data-dir={temp_user_data}')
# 设置 User-Agent
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
# 设置语言为中文简体
co.set_argument('--lang=zh-CN')
# Linux 服务器特殊配置
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
# 注意: 不使用 --single-process,因为并发模式下会导致实例冲突
co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口
# 尝试查找 Chrome/Chromium 路径
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log.step(f"使用浏览器: {chrome_path}")
break
else:
co.auto_port() # Windows 使用自动分配端口
# 无头模式 (服务器运行)
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
if BROWSER_HEADLESS:
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
log.step("启动 Chrome (无头模式)...")
else:
log.step("启动 Chrome (无痕模式)...")
# 设置超时
co.set_timeouts(base=PAGE_LOAD_TIMEOUT, page_load=PAGE_LOAD_TIMEOUT * 2)
page = ChromiumPage(co)
# 存储临时目录路径,用于后续清理
page._temp_user_data_dir = temp_user_data
# 注入指纹伪装脚本 (仅在启用随机指纹时)
if BROWSER_RANDOM_FINGERPRINT:
_inject_fingerprint(page, fingerprint)
log.success("浏览器启动成功")
return page
except Exception as e:
last_error = e
log.warning(f"浏览器启动失败 (尝试 {attempt + 1}/{max_retries}): {e}")
# 清理可能的残留
cleanup_chrome_processes()
# 所有重试都失败
log.error(f"浏览器启动失败,已重试 {max_retries} 次: {last_error}")
raise last_error
@contextmanager
def browser_context(max_retries: int = BROWSER_MAX_RETRIES):
"""浏览器上下文管理器 - 自动管理浏览器生命周期
使用示例:
with browser_context() as page:
page.get("https://example.com")
# 做一些操作...
# 浏览器会自动关闭
Args:
max_retries: 浏览器启动最大重试次数
Yields:
ChromiumPage: 浏览器页面实例
"""
page = None
temp_dir = None
try:
page = init_browser(max_retries)
# 获取临时目录路径
temp_dir = getattr(page, '_temp_user_data_dir', None)
yield page
finally:
if page:
log.step("关闭浏览器...")
try:
page.quit()
except Exception as e:
log.warning(f"浏览器关闭异常: {e}")
finally:
# 确保清理残留进程
cleanup_chrome_processes()
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
@contextmanager
def browser_context_with_retry(max_browser_retries: int = 2):
"""带重试机制的浏览器上下文管理器
在整体流程失败时自动重试,适用于注册/授权等复杂流程
使用示例:
with browser_context_with_retry() as ctx:
for attempt in ctx.attempts():
try:
page = ctx.page
# 做一些操作...
break # 成功则退出
except Exception as e:
ctx.handle_error(e)
Args:
max_browser_retries: 最大重试次数
Yields:
BrowserRetryContext: 重试上下文对象
"""
ctx = BrowserRetryContext(max_browser_retries)
try:
yield ctx
finally:
ctx.cleanup()
class BrowserRetryContext:
"""浏览器重试上下文"""
def __init__(self, max_retries: int = 2):
self.max_retries = max_retries
self.current_attempt = 0
self.page = None
self._should_continue = True
def attempts(self):
"""生成重试迭代器"""
for attempt in range(self.max_retries):
if not self._should_continue:
break
# 检查停止请求
check_shutdown()
self.current_attempt = attempt
# 非首次尝试时的清理和等待
if attempt > 0:
log.warning(f"重试整体流程 ({attempt + 1}/{self.max_retries})...")
self._cleanup_page()
cleanup_chrome_processes()
time.sleep(2)
# 初始化浏览器
try:
check_shutdown() # 再次检查
self.page = init_browser()
yield attempt
except ShutdownRequested:
self._should_continue = False
raise
except Exception as e:
log.error(f"浏览器初始化失败: {e}")
if attempt >= self.max_retries - 1:
raise
def handle_error(self, error: Exception):
"""处理错误,决定是否继续重试"""
# 如果是停止请求,直接停止
if isinstance(error, ShutdownRequested):
self._should_continue = False
return
log.error(f"流程异常: {error}")
if self.current_attempt >= self.max_retries - 1:
self._should_continue = False
else:
log.warning("准备重试...")
def stop(self):
"""停止重试"""
self._should_continue = False
def _cleanup_page(self):
"""清理当前页面"""
if self.page:
# 获取临时目录路径
temp_dir = getattr(self.page, '_temp_user_data_dir', None)
try:
self.page.quit()
except Exception:
pass
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
self.page = None
def cleanup(self):
"""最终清理"""
if self.page:
log.step("关闭浏览器...")
# 获取临时目录路径
temp_dir = getattr(self.page, '_temp_user_data_dir', None)
try:
self.page.quit()
except Exception:
pass
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
self.page = None
def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -> bool:
"""等待页面稳定 (页面加载完成且 DOM 不再变化)
Args:
page: 浏览器页面对象
timeout: 超时时间 (秒)
check_interval: 检查间隔 (秒)
Returns:
bool: 是否稳定
"""
start_time = time.time()
last_html_len = 0
stable_count = 0
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
# 检查浏览器标签页是否还在加载(favicon 旋转动画)
ready_state = page.run_js('return document.readyState', timeout=2)
if ready_state != 'complete':
stable_count = 0
time.sleep(check_interval)
continue
current_len = len(page.html)
if current_len == last_html_len:
stable_count += 1
if stable_count >= 3: # 连续 3 次检查都稳定
return True
else:
stable_count = 0
last_html_len = current_len
time.sleep(check_interval)
except ShutdownRequested:
raise
except Exception:
time.sleep(check_interval)
return False
def check_and_handle_error_page(page, max_retries: int = 2) -> bool:
"""检测并处理错误页面(如 Operation timed out)
Args:
page: 浏览器页面对象
max_retries: 最大重试次数
Returns:
bool: 是否成功处理(页面恢复正常)
"""
for attempt in range(max_retries):
# 检测错误页面
error_text = page.ele('text:糟糕,出错了', timeout=1) or \
page.ele('text:Something went wrong', timeout=1) or \
page.ele('text:Operation timed out', timeout=1)
if not error_text:
return True # 没有错误,正常
log.warning(f"检测到错误页面,尝试重试 ({attempt + 1}/{max_retries})...")
# 点击重试按钮
retry_btn = page.ele('text:重试', timeout=2) or page.ele('text:Retry', timeout=1)
if retry_btn:
retry_btn.click()
time.sleep(3)
wait_for_page_stable(page, timeout=8)
else:
# 没有重试按钮,刷新页面
page.refresh()
time.sleep(3)
wait_for_page_stable(page, timeout=8)
# 最后再检查一次
error_text = page.ele('text:糟糕,出错了', timeout=1) or page.ele('text:Something went wrong', timeout=1)
return error_text is None
def wait_for_element(page, selector: str, timeout: int = 10, visible: bool = True):
"""智能等待元素出现
Args:
page: 浏览器页面对象
selector: CSS 选择器
timeout: 超时时间 (秒)
visible: 是否要求元素可见
Returns:
元素对象或 None
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
element = page.ele(selector, timeout=1)
if element:
if not visible or (element.states.is_displayed if hasattr(element, 'states') else True):
return element
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.3)
return None
def wait_for_url_change(page, old_url: str, timeout: int = 15, contains: str = None) -> bool:
"""等待 URL 变化
Args:
page: 浏览器页面对象
old_url: 原始 URL
timeout: 超时时间 (秒)
contains: 新 URL 需要包含的字符串 (可选)
Returns:
bool: URL 是否已变化
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
current_url = page.url
if current_url != old_url:
if contains is None or contains in current_url:
return True
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.5)
return False
def type_slowly(page, selector_or_element, text, base_delay=None):
"""缓慢输入文本 (模拟真人输入)
Args:
page: 浏览器页面对象 (用于重新获取元素)
selector_or_element: CSS 选择器字符串或元素对象
text: 要输入的文本
base_delay: 基础延迟 (秒),默认使用 TYPING_DELAY
"""
if base_delay is None:
base_delay = TYPING_DELAY
# 获取元素 (如果传入的是选择器则查找,否则直接使用)
if isinstance(selector_or_element, str):
element = page.ele(selector_or_element, timeout=10)
else:
element = selector_or_element
if not text:
return
# 对于短文本(如验证码),直接一次性输入,速度更快
if len(text) <= 8:
element.input(text, clear=True)
return
# 长文本使用逐字符输入
element.input(text[0], clear=True)
time.sleep(random.uniform(0.1, 0.2))
# 逐个输入剩余字符,不重新获取元素
for char in text[1:]:
element.input(char, clear=False)
# 随机延迟
actual_delay = base_delay * random.uniform(0.5, 1.2)
if char in ' @._-':
actual_delay *= 1.3
time.sleep(actual_delay)
def human_delay(min_sec: float = None, max_sec: float = None):
"""模拟人类操作间隔
Args:
min_sec: 最小延迟 (秒),默认使用 ACTION_DELAY[0]
max_sec: 最大延迟 (秒),默认使用 ACTION_DELAY[1]
"""
if min_sec is None:
min_sec = ACTION_DELAY[0]
if max_sec is None:
max_sec = ACTION_DELAY[1]
time.sleep(random.uniform(min_sec, max_sec))
def _detect_page_language(page) -> str:
"""检测页面语言
Returns:
str: 'zh' 中文, 'en' 英文
"""
try:
# 检查页面是否包含中文特征
html = page.html
# 检查中文关键词
chinese_keywords = ['确认', '继续', '登录', '注册', '验证', '邮箱', '密码', '姓名', '生日']
for keyword in chinese_keywords:
if keyword in html:
return 'zh'
return 'en'
except Exception:
return 'en' # 默认英文
def _input_birthday(page, birthday: dict):
"""智能输入生日 (自动适配中英文界面)
中文界面: yyyy/mm/dd (年/月/日)
英文界面: mm/dd/yyyy (月/日/年)
Args:
page: 浏览器页面对象
birthday: {'year': '2000', 'month': '01', 'day': '15'}
"""
# 获取三个输入框
year_input = wait_for_element(page, 'css:[data-type="year"]', timeout=10)
month_input = wait_for_element(page, 'css:[data-type="month"]', timeout=5)
day_input = wait_for_element(page, 'css:[data-type="day"]', timeout=5)
if not all([year_input, month_input, day_input]):
log.warning("未找到完整的生日输入框")
return False
# 检测页面语言
lang = _detect_page_language(page)
log.step(f"输入生日: {birthday['year']}/{birthday['month']}/{birthday['day']} (界面语言: {lang})")
# 根据语言决定输入顺序
if lang == 'zh':
# 中文: 年 -> 月 -> 日
input_order = [(year_input, birthday['year']),
(month_input, birthday['month']),
(day_input, birthday['day'])]
else:
# 英文: 月 -> 日 -> 年
input_order = [(month_input, birthday['month']),
(day_input, birthday['day']),
(year_input, birthday['year'])]
# 按顺序输入
for input_elem, value in input_order:
try:
input_elem.click()
time.sleep(0.15)
input_elem.input(value, clear=True)
time.sleep(0.2)
except Exception as e:
log.warning(f"生日输入异常: {e}")
log.success("生日已输入")
return True
def check_and_handle_error(page, max_retries=5) -> bool:
"""检查并处理页面错误 (带自动重试)"""
for attempt in range(max_retries):
try:
page_source = page.html.lower()
error_keywords = ['出错', 'error', 'timed out', 'operation timeout', 'route error', 'invalid content']
has_error = any(keyword in page_source for keyword in error_keywords)
if has_error:
try:
retry_btn = page.ele('css:button[data-dd-action-name="Try again"]', timeout=2)
if retry_btn:
log.warning(f"检测到错误页面,点击重试 ({attempt + 1}/{max_retries})...")
retry_btn.click()
wait_time = 3 + attempt # 递增等待,但减少基础时间
time.sleep(wait_time)
return True
except Exception:
time.sleep(1)
continue
return False
except Exception:
return False
return False
def retry_on_page_refresh(func):
"""装饰器: 页面刷新时自动重试"""
def wrapper(*args, **kwargs):
max_retries = 3
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
error_msg = str(e).lower()
if '页面被刷新' in error_msg or 'page refresh' in error_msg or 'stale' in error_msg:
if attempt < max_retries - 1:
log.warning(f"页面刷新,重试操作 ({attempt + 1}/{max_retries})...")
time.sleep(1)
continue
raise
return None
return wrapper
def is_logged_in(page, timeout: int = 5) -> bool:
"""检测是否已登录 ChatGPT (通过 API 请求判断)
通过请求 /api/auth/session 接口判断:
- 已登录: 返回包含 user 字段的 JSON
- 未登录: 返回 {}
"""
try:
# 使用 JavaScript 请求 session API,设置超时
result = page.run_js(f'''
return Promise.race([
fetch('/api/auth/session', {{
method: 'GET',
credentials: 'include'
}})
.then(r => r.json())
.then(data => JSON.stringify(data))
.catch(e => '{{}}'),
new Promise((_, reject) => setTimeout(() => reject('timeout'), {timeout * 1000}))
]).catch(() => '{{}}');
''', timeout=timeout + 2)
if result and result != '{}':
import json
data = json.loads(result)
if data.get('user') and data.get('accessToken'):
log.success(f"已登录: {data['user'].get('email', 'unknown')}")
return True
return False
except Exception as e:
log.warning(f"登录检测异常: {e}")
return False
def register_openai_account_api(email: str, password: str, proxy: str = None,
team_name: str = None, max_email_retries: int = 3) -> bool:
"""使用协议模式 (API) 注册 OpenAI 账号
如果验证码获取超时,会自动创建新邮箱重试(不进入浏览器模式)
Args:
email: 邮箱地址
password: 密码
proxy: 代理地址 (可选)
team_name: Team 名称 (用于邀请新邮箱)
max_email_retries: 验证码超时后最大重试次数 (创建新邮箱)
Returns:
bool: 是否成功
str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
if not API_MODE_AVAILABLE:
log.warning("协议模式不可用,回退到浏览器模式")
return None # 返回 None 表示需要回退
current_email = email
current_password = password
for retry in range(max_email_retries):
if retry > 0:
log.warning(f"验证码超时,尝试创建新邮箱 (重试 {retry}/{max_email_retries - 1})...")
# 创建新邮箱
from email_service import unified_create_email
new_email, new_password = unified_create_email()
if not new_email:
log.error("创建新邮箱失败")
continue
# 如果有 team_name,邀请新邮箱到 Team
if team_name:
from team_service import invite_single_to_team
from config import TEAMS
# 查找 team 配置
team = None
for t in TEAMS:
if t.get("name") == team_name:
team = t
break
if team:
log.step(f"邀请新邮箱到 Team: {new_email}")
if not invite_single_to_team(new_email, team):
log.error("新邮箱邀请失败")
continue
log.success(f"新邮箱邀请成功: {new_email}")
current_email = new_email
current_password = new_password
log.info(f"[API模式] 开始注册 OpenAI 账号: {current_email}", icon="account")
# 生成随机姓名和生日
random_name = get_random_name()
birthday = get_random_birthday()
birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
log.step(f"姓名: {random_name}, 生日: {birthdate}")
# 验证码超时标志
verification_timeout = False
# 定义获取验证码的函数 (5秒超时)
def get_code(target_email):
nonlocal verification_timeout
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件 (5秒超时)...")
# 使用较短的超时时间: 5 次快速重试,每次 1 秒
code, error, email_time = unified_get_verification_code(
target_email,
max_retries=5, # 5 次重试
interval=1 # 每次间隔 1 秒
)
if code:
log.success(f"获取到验证码: {code}")
return code
else:
verification_timeout = True
log.warning("验证码获取超时 (5秒)")
return None
# 执行 API 注册
try:
result = api_register_account_only(
email=current_email,
password=current_password,
real_name=random_name,
birthdate=birthdate,
get_verification_code_func=get_code,
proxy=proxy,
progress_callback=lambda msg: log.step(msg)
)
if result:
log.success(f"[API模式] 注册完成: {current_email}")
# 如果使用了新邮箱,返回特殊标记
if current_email != email:
return f"new_email:{current_email}:{current_password}"
return True
elif verification_timeout:
# 验证码超时,继续下一次重试(创建新邮箱)
log.warning("[API模式] 验证码超时,将创建新邮箱重试...")
continue
else:
log.warning("[API模式] 注册失败")
return False
except Exception as e:
log.error(f"[API模式] 注册异常: {e}")
if "验证码" in str(e) or "timeout" in str(e).lower():
# 验证码相关异常,继续重试
continue
return False
log.error(f"[API模式] 已重试 {max_email_retries} 次,全部失败")
return False
def register_openai_account_auto(page, email: str, password: str, use_api: bool = True,
proxy: str = None, team_name: str = None,
allow_fallback: bool = False) -> bool:
"""自动选择模式注册 OpenAI 账号
Args:
page: 浏览器实例 (用于浏览器模式回退)
email: 邮箱地址
password: 密码
use_api: 是否优先使用 API 模式
proxy: 代理地址 (API 模式使用)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
allow_fallback: 是否允许 API 失败后回退到浏览器模式 (默认 False)
Returns:
bool/str:
- True: 成功
- False: 失败
- "retry_new_email": API 失败,需要重新生成邮箱重试
- "new_email:xxx@xxx.com:password": 使用了新邮箱
- "domain_blacklisted": 域名被列入黑名单
"""
# 如果启用 API 模式且可用
if use_api and API_MODE_AVAILABLE:
result = register_openai_account_api(email, password, proxy, team_name)
if result is True:
return True
elif isinstance(result, str) and result.startswith("new_email:"):
# 使用了新邮箱,返回新邮箱信息
return result
elif result == "domain_blacklisted":
return "domain_blacklisted"
elif result is False:
if not allow_fallback:
log.warning("API 模式注册失败,需要重新生成邮箱重试")
return "retry_new_email"
log.warning("API 模式注册失败,回退到浏览器模式...")
# result is None 表示 API 模式不可用
elif result is None and not allow_fallback:
log.warning("API 模式不可用,需要重新生成邮箱重试")
return "retry_new_email"
# 如果不使用 API 或允许回退,使用浏览器模式
if not use_api or allow_fallback:
return register_openai_account(page, email, password)
return "retry_new_email"
def register_openai_account(page, email: str, password: str) -> bool:
"""使用浏览器注册 OpenAI 账号
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 是否成功
"""
log.info(f"开始注册 OpenAI 账号: {email}", icon="account")
try:
# 打开注册页面
url = "https://chatgpt.com"
log.step(f"打开 {url}")
page.get(url)
# 智能等待页面加载完成
wait_for_page_stable(page, timeout=8)
log_current_url(page, "页面加载完成", force=True)
# 检查页面是否正常加载
current_url = page.url
# 如果已经在 auth.openai.com,说明页面正常,直接继续
if "auth.openai.com" in current_url:
log.info("已跳转到认证页面")
else:
# 在 chatgpt.com,检查是否有注册按钮
page_ok = page.ele('css:[data-testid="signup-button"]', timeout=1) or \
page.ele('text:免费注册', timeout=1) or \
page.ele('text:Sign up for free', timeout=1) or \
page.ele('text:登录', timeout=1) or \
page.ele('text:Log in', timeout=1)
if not page_ok:
log.warning("页面加载异常,3秒后刷新...")
save_debug_screenshot(page, 'page_load_abnormal')
time.sleep(3)
page.refresh()
wait_for_page_stable(page, timeout=8)
log_current_url(page, "刷新后", force=True)
# 检测是否已登录 (通过 API 判断)
try:
if is_logged_in(page):
log.success("检测到已登录,跳过注册步骤")
return True
except Exception:
pass # 忽略登录检测异常,继续注册流程
# 点击"免费注册"按钮
log.step("点击免费注册...")
signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=5)
if not signup_btn:
signup_btn = wait_for_element(page, 'text:免费注册', timeout=3)
if not signup_btn:
signup_btn = wait_for_element(page, 'text:Sign up for free', timeout=3)
if signup_btn:
old_url = page.url
signup_btn.click()
# 等待 URL 变化或弹窗/输入框出现 (最多3秒快速检测)
for _ in range(6):
time.sleep(0.5)
if page.url != old_url:
log_url_change(page, old_url, "点击注册按钮")
break
# 检测弹窗中的邮箱输入框
try:
email_input = page.ele('css:input[type="email"], input[name="email"]', timeout=1)
if email_input and email_input.states.is_displayed:
break
except Exception:
pass
current_url = page.url
log_current_url(page, "注册按钮点击后")
# 如果没有跳转到 auth.openai.com,检查是否在 chatgpt.com 弹窗中
if "auth.openai.com" not in current_url and "chatgpt.com" in current_url:
log.step("尝试在当前弹窗中输入邮箱...")
# 等待弹窗完全加载 (增加等待时间)
time.sleep(2)
# 快速检查弹窗是否正常加载(包含登录表单)- 增加更多选择器
login_form = wait_for_element(page, 'css:[data-testid="login-form"]', timeout=3)
if not login_form:
login_form = page.ele('text:登录或注册', timeout=2) or \
page.ele('text:Log in or sign up', timeout=2) or \
page.ele('text:Welcome back', timeout=1) or \
page.ele('text:欢迎回来', timeout=1) or \
page.ele('css:input[type="email"]', timeout=2) or \
page.ele('css:input[name="email"]', timeout=1)
if not login_form:
# 弹窗内容异常,关闭并刷新页面重试
log.warning("弹窗内容异常,刷新页面重试...")
save_debug_screenshot(page, 'popup_abnormal')
close_btn = page.ele('css:button[aria-label="Close"], button[aria-label="关闭"]', timeout=1)
if not close_btn:
close_btn = page.ele('css:button:has(svg)', timeout=1)
if close_btn:
close_btn.click()
time.sleep(0.5)
# 刷新页面
page.refresh()
wait_for_page_stable(page, timeout=8)
log_current_url(page, "刷新后", force=True)
# 重新点击注册按钮
log.step("重新点击免费注册...")
signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=8) or \
wait_for_element(page, 'text:免费注册', timeout=5) or \
wait_for_element(page, 'text:Sign up for free', timeout=3) or \
wait_for_element(page, 'text:Get started', timeout=2)
if signup_btn:
signup_btn.click()
time.sleep(3) # 增加等待时间
# 再次检查弹窗
login_form = page.ele('css:[data-testid="login-form"]', timeout=5) or \
page.ele('text:登录或注册', timeout=3) or \
page.ele('css:input[type="email"]', timeout=3)
if not login_form:
log.error("重试后弹窗仍然异常,跳过此账号")
save_debug_screenshot(page, 'popup_retry_failed')
return False
else:
log.error("找不到注册按钮,跳过此账号")
save_debug_screenshot(page, 'signup_button_not_found')
return False
# 尝试输入邮箱
email_input = wait_for_element(page, 'css:input[type="email"], input[name="email"], input[id="email"]', timeout=5)
if email_input:
human_delay()
type_slowly(page, 'css:input[type="email"], input[name="email"], input[id="email"]', email)
log.success("邮箱已输入")
# 点击继续
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10, contains="/password")
# === 使用循环处理整个注册流程 ===
max_steps = 10 # 防止无限循环
for step in range(max_steps):
current_url = page.url
log_current_url(page, f"注册流程步骤 {step + 1}")
# 如果在 chatgpt.com 且已登录,注册成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
try:
if is_logged_in(page):
log.success("检测到已登录,账号已注册成功")
return True
except Exception:
pass
# 步骤1: 输入邮箱 (在 log-in-or-create-account 页面)
if "auth.openai.com/log-in-or-create-account" in current_url:
progress_update(phase="注册", step="输入邮箱...")
log.step("等待邮箱输入框...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=15)
if not email_input:
log.error("无法找到邮箱输入框")
return False
human_delay() # 模拟人类思考时间
log.step("输入邮箱...")
type_slowly(page, 'css:input[type="email"]', email)
log.success("邮箱已输入")
# 点击继续
human_delay(0.5, 1.2)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤2: 输入密码 (在密码页面: log-in/password 或 create-account/password)
if "auth.openai.com/log-in/password" in current_url or "auth.openai.com/create-account/password" in current_url:
progress_update(phase="注册", step="输入密码...")
# 先检查是否有密码错误提示,如果有则使用一次性验证码登录
try:
error_text = page.ele('text:Incorrect email address or password', timeout=1)
if error_text and error_text.states.is_displayed:
log.warning("密码错误,尝试使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=3)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if otp_btn:
old_url = page.url
otp_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
# 检查密码框是否已有内容(避免重复输入)
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if not password_input:
log.error("无法找到密码输入框")
return False
# 检查是否已输入密码
try:
current_value = password_input.attr('value') or ''
if len(current_value) > 0:
log.info("密码已输入,点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
log.step("等待密码输入框...")
human_delay() # 模拟人类思考时间
log.step("输入密码...")
type_slowly(page, 'css:input[type="password"]', password)
log.success("密码已输入")
# 点击继续
human_delay(0.5, 1.2)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
# 等待页面变化,检测是否密码错误
time.sleep(2)
# 检查是否出现密码错误提示
try:
error_text = page.ele('text:Incorrect email address or password', timeout=1)
if error_text and error_text.states.is_displayed:
log.warning("密码错误,尝试使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=3)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if otp_btn:
otp_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤3: 验证码页面
if "auth.openai.com/email-verification" in current_url:
break # 跳出循环,进入验证码流程
# 步骤4: 姓名/年龄页面 (账号已存在)
if "auth.openai.com/about-you" in current_url:
break # 跳出循环,进入补充信息流程
# 处理错误
if check_and_handle_error(page):
time.sleep(0.5)
continue
# 短暂等待页面变化
time.sleep(0.5)
# === 根据 URL 快速判断页面状态 ===
current_url = page.url
# 如果是 chatgpt.com 首页,说明已注册成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
try:
if is_logged_in(page):
log.success("检测到已登录,账号已注册成功")
return True
except Exception:
pass
# 检测到姓名/年龄输入页面 (账号已存在,只需补充信息)
if "auth.openai.com/about-you" in current_url:
progress_update(phase="注册", step="补充个人信息...")
log_current_url(page, "个人信息页面")
log.info("检测到姓名输入页面,账号已存在,补充信息...")
# 等待页面加载
name_input = wait_for_element(page, 'css:input[name="name"]', timeout=5)
if not name_input:
name_input = wait_for_element(page, 'css:input[autocomplete="name"]', timeout=3)
# 输入姓名
random_name = get_random_name()
log.step(f"输入姓名: {random_name}")
type_slowly(page, 'css:input[name="name"], input[autocomplete="name"]', random_name)
# 输入生日 (使用智能适配函数)
birthday = get_random_birthday()
_input_birthday(page, birthday)
# 点击提交
log.step("点击最终提交...")
time.sleep(0.5)
submit_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if submit_btn:
submit_btn.click()
time.sleep(2)
log.success(f"注册完成: {email}")
return True
# 检测到验证码页面
needs_verification = "auth.openai.com/email-verification" in current_url
if needs_verification:
log_current_url(page, "邮箱验证码页面")
if not needs_verification:
# 检查验证码输入框是否存在
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=3)
if code_input:
needs_verification = True
log_current_url(page, "邮箱验证码页面")
# 只有在 chatgpt.com 页面且已登录才能判断为成功
if not needs_verification:
try:
if "chatgpt.com" in page.url and is_logged_in(page):
log.success("账号已注册成功")
return True
except Exception:
pass
log.error("注册流程异常,未到达预期页面")
return False
# 获取验证码
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("无法获取验证码")
return False
# 验证码重试循环 (最多重试 3 次)
max_code_retries = 3
for code_attempt in range(max_code_retries):
# 输入验证码
progress_update(phase="注册", step="输入验证码...")
log.step(f"输入验证码: {verification_code}")
while check_and_handle_error(page):
time.sleep(1)
# 重新获取输入框 (可能页面已刷新)
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[placeholder*="代码"]', timeout=5)
if not code_input:
# 再次检查是否已登录
try:
if is_logged_in(page):
log.success("检测到已登录,跳过验证码输入")
return True
except Exception:
pass
log.error("无法找到验证码输入框")
return False
# 清空并输入验证码
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="code"], input[placeholder*="代码"]', verification_code, base_delay=0.08)
time.sleep(0.5)
# 点击继续
log.step("点击继续...")
for attempt in range(3):
try:
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=10)
if continue_btn:
continue_btn.click()
break
except Exception:
time.sleep(0.5)
time.sleep(2)
# 检查是否出现"代码不正确"错误
try:
error_text = page.ele('text:代码不正确', timeout=1)
if not error_text:
error_text = page.ele('text:incorrect', timeout=1)
if not error_text:
error_text = page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
# 点击"重新发送电子邮件"
resend_btn = page.ele('text:重新发送电子邮件', timeout=3)
if not resend_btn:
resend_btn = page.ele('text:Resend email', timeout=2)
if not resend_btn:
resend_btn = page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
# 重新获取验证码
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if verification_code:
continue # 继续下一次尝试
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return False
else:
# 没有错误,验证码正确,跳出循环
break
except Exception:
# 没有检测到错误,继续
break
while check_and_handle_error(page):
time.sleep(0.5)
# 记录当前页面 (应该是 about-you 个人信息页面)
log_current_url(page, "验证码通过后-个人信息页面")
# 输入姓名 (随机外国名字)
random_name = get_random_name()
log.step(f"输入姓名: {random_name}")
name_input = wait_for_element(page, 'css:input[name="name"]', timeout=15)
if not name_input:
name_input = wait_for_element(page, 'css:input[autocomplete="name"]', timeout=5)
type_slowly(page, 'css:input[name="name"], input[autocomplete="name"]', random_name)
# 输入生日 (使用智能适配函数)
birthday = get_random_birthday()
_input_birthday(page, birthday)
# 最终提交
log.step("点击最终提交...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=10)
if continue_btn:
continue_btn.click()
# 等待并检查是否出现 "email not supported" 错误
time.sleep(2)
try:
error_text = page.ele('text:The email you provided is not supported', timeout=2)
if error_text and error_text.states.is_displayed:
log.error("邮箱域名不被支持,需要加入黑名单")
return "domain_blacklisted"
except Exception:
pass
log.success(f"注册完成: {email}")
time.sleep(1)
return True
except Exception as e:
log.error(f"注册失败: {e}")
return False
def perform_codex_authorization(page, email: str, password: str) -> dict:
"""执行 Codex 授权流程
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
dict: codex_data 或 None
"""
log.info(f"开始 Codex 授权: {email}", icon="code")
# 生成授权 URL
auth_url, session_id = crs_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取授权 URL")
return None
# 打开授权页面
log.step("打开授权页面...")
log.info(f"[URL] 授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
# 再次检测错误页面
check_and_handle_error_page(page)
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
# 可能是错误页面,再检测一次
if check_and_handle_error_page(page):
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if not email_input:
email_input = wait_for_element(page, '#email', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "输入邮箱后点击继续")
except Exception as e:
log.warning(f"邮箱输入步骤异常: {e}")
log_current_url(page, "邮箱步骤完成后")
# 检测错误页面
if check_and_handle_error_page(page):
# 错误重试后,检查当前页面状态
current_url = page.url
# 如果回到了登录页面,需要重新输入邮箱
if "auth.openai.com/log-in" in current_url and "/password" not in current_url:
log.info("重试后回到登录页,重新输入邮箱...")
try:
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"]', email, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
except Exception as e:
log.warning(f"重新输入邮箱异常: {e}")
# 再次检查当前 URL,确定下一步
current_url = page.url
# 只有在密码页面才输入密码
if "/password" in current_url or "log-in/password" in current_url or "create-account/password" in current_url:
try:
# 输入密码
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if not password_input:
# 可能是错误页面
if check_and_handle_error_page(page):
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if not password_input:
password_input = wait_for_element(page, 'css:input[name="password"]', timeout=5)
if password_input:
type_slowly(page, 'css:input[type="password"], input[name="password"]', password, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "输入密码后点击继续")
except Exception as e:
log.warning(f"密码输入步骤异常: {e}")
else:
# 不在密码页面,可能需要先输入邮箱
log.info(f"当前不在密码页面: {current_url}")
try:
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=3)
if email_input:
log.step("输入邮箱...")
type_slowly(page, 'css:input[type="email"]', email, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
# 现在应该在密码页面了
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
log.step("输入密码...")
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
except Exception as e:
log.warning(f"登录流程异常: {e}")
log_current_url(page, "密码步骤完成后")
# 等待授权回调
max_wait = 45 # 减少等待时间
start_time = time.time()
code = None
progress_shown = False
last_url_in_loop = None
log.step(f"等待授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录URL变化
if current_url != last_url_in_loop:
log_current_url(page, "等待回调中")
last_url_in_loop = current_url
# 检查是否到达回调页面
if "localhost:1455/auth/callback" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
log.success("获取到回调 URL")
log.info(f"[URL] 回调地址: {current_url}", icon="browser")
code = extract_code_from_url(current_url)
if code:
log.success("提取授权码成功")
break
# 尝试点击授权按钮
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
time.sleep(1.5) # 减少等待
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5) # 减少轮询间隔
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"检查异常: {e}")
time.sleep(1.5)
if not code:
if progress_shown:
log.progress_clear()
log.warning("授权超时")
try:
current_url = page.url
if "code=" in current_url:
code = extract_code_from_url(current_url)
except Exception:
pass
if not code:
log.error("无法获取授权码")
return None
# 交换 tokens
log.step("交换 tokens...")
codex_data = crs_exchange_code(code, session_id)
if codex_data:
log.success("Codex 授权成功")
return codex_data
else:
log.error("Token 交换失败")
return None
def perform_codex_authorization_with_otp(page, email: str) -> dict:
"""执行 Codex 授权流程 (使用一次性验证码登录,适用于已注册的 Team Owner)
Args:
page: 浏览器页面实例
email: 邮箱地址
Returns:
dict: codex_data 或 None
"""
log.info("开始 Codex 授权 (OTP 登录)...", icon="auth")
# 生成授权 URL
auth_url, session_id = crs_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取授权 URL")
return None
# 打开授权页面
log.step("打开授权页面...")
log.info(f"[URL] 授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "OTP授权页面加载完成", force=True)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "OTP流程-输入邮箱后")
except Exception as e:
log.warning(f"邮箱输入步骤异常: {e}")
log_current_url(page, "OTP流程-邮箱步骤完成后")
try:
# 检查是否在密码页面,如果是则点击"使用一次性验证码登录"
current_url = page.url
if "/log-in/password" in current_url or "/password" in current_url:
log.step("检测到密码页面,点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=5)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if not otp_btn:
# 尝试通过按钮文本查找
buttons = page.eles('css:button')
for btn in buttons:
btn_text = btn.text.lower()
if '一次性验证码' in btn_text or 'one-time' in btn_text:
otp_btn = btn
break
if otp_btn:
old_url = page.url
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "点击OTP按钮后")
else:
log.warning("未找到一次性验证码登录按钮")
else:
# 不在密码页面,尝试直接找 OTP 按钮
log.step("点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'css:button[value="passwordless_login_send_otp"]', timeout=10)
if not otp_btn:
otp_btn = wait_for_element(page, 'css:button._inlinePasswordlessLogin', timeout=5)
if not otp_btn:
buttons = page.eles('css:button')
for btn in buttons:
if '一次性验证码' in btn.text or 'one-time' in btn.text.lower():
otp_btn = btn
break
if otp_btn:
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
time.sleep(2)
else:
log.warning("未找到一次性验证码登录按钮,尝试继续...")
except Exception as e:
log.warning(f"点击 OTP 按钮异常: {e}")
log_current_url(page, "OTP流程-准备获取验证码")
# 等待并获取验证码
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
# 手动输入
verification_code = input("⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("未输入验证码")
return None
# 验证码重试循环 (最多重试 3 次)
max_code_retries = 3
for code_attempt in range(max_code_retries):
try:
# 输入验证码
log.step(f"输入验证码: {verification_code}")
code_input = wait_for_element(page, 'css:input[name="otp"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[type="text"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=5)
if code_input:
# 清空并输入验证码
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="otp"], input[type="text"], input[autocomplete="one-time-code"]', verification_code, base_delay=0.08)
log.success("验证码已输入")
else:
log.error("未找到验证码输入框")
return None
# 点击继续/验证按钮
log.step("点击继续...")
time.sleep(1)
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
time.sleep(2)
# 检查是否出现"代码不正确"错误
try:
error_text = page.ele('text:代码不正确', timeout=1)
if not error_text:
error_text = page.ele('text:incorrect', timeout=1)
if not error_text:
error_text = page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
# 点击"重新发送电子邮件"
resend_btn = page.ele('text:重新发送电子邮件', timeout=3)
if not resend_btn:
resend_btn = page.ele('text:Resend email', timeout=2)
if not resend_btn:
resend_btn = page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
# 重新获取验证码
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if verification_code:
continue # 继续下一次尝试
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return None
else:
# 没有错误,验证码正确,跳出循环
break
except Exception:
# 没有检测到错误元素,说明验证码正确,继续
break
except Exception as e:
log.warning(f"验证码输入步骤异常: {e}")
break
# 等待授权回调
max_wait = 45
start_time = time.time()
code = None
progress_shown = False
last_url_in_loop = None
log.step(f"等待授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录URL变化
if current_url != last_url_in_loop:
log_current_url(page, "OTP流程-等待回调中")
last_url_in_loop = current_url
# 检查是否到达回调页面
if "localhost:1455/auth/callback" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
log.success("获取到回调 URL")
log.info(f"[URL] 回调地址: {current_url}", icon="browser")
code = extract_code_from_url(current_url)
if code:
log.success("提取授权码成功")
break
# 尝试点击授权按钮
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
time.sleep(1.5)
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"检查异常: {e}")
time.sleep(1.5)
if not code:
if progress_shown:
log.progress_clear()
log.warning("授权超时")
try:
current_url = page.url
if "code=" in current_url:
code = extract_code_from_url(current_url)
except Exception:
pass
if not code:
log.error("无法获取授权码")
return None
# 交换 tokens
log.step("交换 tokens...")
codex_data = crs_exchange_code(code, session_id)
if codex_data:
log.success("Codex 授权成功 (OTP)")
return codex_data
else:
log.error("Token 交换失败")
return None
def login_and_authorize_with_otp(email: str) -> tuple[bool, dict]:
"""Team Owner 专用: 使用一次性验证码登录并完成 Codex 授权
Args:
email: 邮箱地址
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- CPA 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 使用 OTP 登录
success = perform_cpa_authorization_with_otp(ctx.page, email)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA OTP 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式: 使用 OTP 登录
codex_data = perform_codex_authorization_with_otp(ctx.page, email)
if codex_data:
return True, codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None
return False, None
def register_only(email: str, password: str, use_api_register: bool = True) -> str:
"""仅注册 OpenAI 账号,不进行授权 (用于并行注册 + 串行授权模式)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
Returns:
str: 注册结果
- "success": 注册成功
- "domain_blacklisted": 域名被列入黑名单
- "retry_new_email": API 失败,需要重新生成邮箱重试
- "failed": 注册失败
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, email, password,
use_api=use_api_register
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted"
# 检查是否需要重新生成邮箱
if register_result == "retry_new_email":
ctx.stop()
return "retry_new_email"
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return "failed"
# 注册成功
log.success(f"注册成功: {email}")
return "success"
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return "failed"
return "failed"
def register_and_authorize(email: str, password: str, use_api_register: bool = True,
team_name: str = None) -> tuple:
"""完整流程: 注册 OpenAI + Codex 授权 (带重试机制)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
tuple: (register_success, codex_data, new_email_info)
- register_success: True/False/"domain_blacklisted"/"retry_new_email"
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
- new_email_info: 如果使用了新邮箱,返回 {"email": "xxx", "password": "xxx"},否则为 None
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
# 用于跟踪是否使用了新邮箱
new_email_info = None
current_email = email
current_password = password
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, current_email, current_password,
use_api=use_api_register,
team_name=team_name
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted", None, None
# 检查是否需要重新生成邮箱
if register_result == "retry_new_email":
ctx.stop()
return "retry_new_email", None, None
# 检查是否使用了新邮箱
if isinstance(register_result, str) and register_result.startswith("new_email:"):
# 解析新邮箱信息: "new_email:xxx@xxx.com:password"
parts = register_result.split(":", 2)
if len(parts) >= 3:
current_email = parts[1]
current_password = parts[2]
new_email_info = {"email": current_email, "password": current_password}
log.success(f"使用新邮箱继续: {current_email}")
register_result = True
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return False, None, new_email_info
# 短暂等待确保注册完成
time.sleep(0.5)
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
# 使用 keepalive 获取锁,防止等待期间浏览器连接断开
lock_acquired = False
if auth_lock:
log.step("等待授权回调锁...")
lock_acquired = acquire_lock_with_keepalive(auth_lock, ctx.page, timeout=180, check_interval=2.0)
if lock_acquired:
log.step("获取授权回调锁,开始授权...")
else:
log.error("获取授权回调锁超时或浏览器连接断开")
continue # 重试
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, current_email, current_password)
return True, codex_data, new_email_info
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None, new_email_info
return False, None, new_email_info
def authorize_only(email: str, password: str) -> tuple[bool, dict]:
"""仅执行 Codex 授权 (适用于已注册但未授权的账号)
Args:
email: 邮箱地址
password: 密码
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
# 使用 keepalive 获取锁,防止等待期间浏览器连接断开
lock_acquired = False
if auth_lock:
log.step("等待授权回调锁...")
lock_acquired = acquire_lock_with_keepalive(auth_lock, ctx.page, timeout=180, check_interval=2.0)
if lock_acquired:
log.step("获取授权回调锁,开始授权...")
else:
log.error("获取授权回调锁超时或浏览器连接断开")
continue # 重试
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth")
success = perform_cpa_authorization(ctx.page, email, password)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
elif AUTH_PROVIDER == "s2a":
log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
success = perform_s2a_authorization(ctx.page, email, password)
if success:
return True, None # S2A 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("S2A 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
codex_data = perform_codex_authorization(ctx.page, email, password)
if codex_data:
return True, codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None
return False, None
# ==================== CPA 授权函数 ====================
def perform_cpa_authorization(page, email: str, password: str) -> bool:
"""执行 CPA 授权流程 (密码登录)
与 CRS 的关键差异:
- CRS 使用 session_id,CPA 使用 state
- CRS 直接交换 code 得到 tokens,CPA 提交整个回调 URL 然后轮询状态
- CPA 授权成功后不需要手动添加账号,后台自动处理
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
log.info(f"开始 CPA 授权: {email}", icon="code")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入邮箱后点击继续")
except Exception as e:
log.warning(f"CPA 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入密码后点击继续")
except Exception as e:
log.warning(f"CPA 密码输入步骤异常: {e}")
log_current_url(page, "CPA-密码步骤完成后")
# 等待授权回调
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录 URL 变化
if current_url != last_url_in_loop:
log_current_url(page, "CPA等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
# 检查是否到达回调页面 (CPA 使用 localhost:1455)
if is_cpa_callback_url(current_url):
if progress_shown:
log.progress_clear()
log.success("CPA 获取到回调 URL")
log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
callback_url = current_url
break
# 尝试点击授权按钮 (只点击一次)
if not auth_btn_clicked:
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(2) # 等待页面响应
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[CPA等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"CPA检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("CPA 无法获取回调 URL")
return False
# CPA 特有流程: 提交回调 URL
log.step("提交 CPA 回调 URL...")
if not cpa_submit_callback(callback_url):
log.error("CPA 回调 URL 提交失败")
return False
# CPA 特有流程: 轮询授权状态
if cpa_poll_auth_status(state):
log.success("CPA Codex 授权成功")
return True
else:
log.error("CPA 授权状态检查失败")
return False
except Exception as e:
log.error(f"CPA 授权异常: {e}")
return False
def perform_cpa_authorization_with_otp(page, email: str) -> bool:
"""执行 CPA 授权流程 (使用一次性验证码登录)
Args:
page: 浏览器页面实例
email: 邮箱地址
Returns:
bool: 授权是否成功
"""
log.info("开始 CPA 授权 (OTP 登录)...", icon="auth")
# 生成授权 URL
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA-OTP授权页面加载完成", force=True)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-OTP流程-输入邮箱后")
except Exception as e:
log.warning(f"CPA OTP 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-OTP流程-邮箱步骤完成后")
try:
# 检查是否在密码页面,如果是则点击"使用一次性验证码登录"
current_url = page.url
if "/log-in/password" in current_url or "/password" in current_url:
log.step("检测到密码页面,点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=5)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if not otp_btn:
buttons = page.eles('css:button')
for btn in buttons:
btn_text = btn.text.lower()
if '一次性验证码' in btn_text or 'one-time' in btn_text:
otp_btn = btn
break
if otp_btn:
old_url = page.url
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-点击OTP按钮后")
else:
log.warning("未找到一次性验证码登录按钮")
except Exception as e:
log.warning(f"CPA 点击 OTP 按钮异常: {e}")
log_current_url(page, "CPA-OTP流程-准备获取验证码")
# 等待并获取验证码
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
verification_code = input(" 请手动输入验证码: ").strip()
if not verification_code:
log.error("未输入验证码")
return False
# 验证码重试循环
max_code_retries = 3
for code_attempt in range(max_code_retries):
try:
log.step(f"输入验证码: {verification_code}")
code_input = wait_for_element(page, 'css:input[name="otp"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[type="text"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=5)
if code_input:
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="otp"], input[type="text"], input[autocomplete="one-time-code"]', verification_code, base_delay=0.08)
log.success("验证码已输入")
else:
log.error("未找到验证码输入框")
return False
log.step("点击继续...")
time.sleep(1)
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
time.sleep(2)
# 检查验证码错误
try:
error_text = page.ele('text:代码不正确', timeout=1) or \
page.ele('text:incorrect', timeout=1) or \
page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
resend_btn = page.ele('text:重新发送电子邮件', timeout=3) or \
page.ele('text:Resend email', timeout=2) or \
page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" 请手动输入验证码: ").strip()
if verification_code:
continue
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return False
else:
break
except Exception:
break
except Exception as e:
log.warning(f"CPA OTP 验证码输入步骤异常: {e}")
break
# 等待授权回调
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
if current_url != last_url_in_loop:
log_current_url(page, "CPA-OTP流程-等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
if is_cpa_callback_url(current_url):
if progress_shown:
log.progress_clear()
log.success("CPA 获取到回调 URL")
log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
callback_url = current_url
break
# 尝试点击授权按钮 (只点击一次)
if not auth_btn_clicked:
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(2) # 等待页面响应
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[CPA-OTP等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"CPA OTP 检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("CPA OTP 无法获取回调 URL")
return False
# CPA 特有流程: 提交回调 URL
log.step("提交 CPA 回调 URL...")
if not cpa_submit_callback(callback_url):
log.error("CPA 回调 URL 提交失败")
return False
# CPA 特有流程: 轮询授权状态
if cpa_poll_auth_status(state):
log.success("CPA Codex 授权成功 (OTP)")
return True
else:
log.error("CPA 授权状态检查失败")
return False
# ==================== S2A 授权函数 ====================
def perform_s2a_authorization(page, email: str, password: str) -> bool:
"""执行 S2A 授权流程 (密码登录)
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
log.info(f"开始 S2A 授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False
# 打开授权页面
progress_update(phase="授权", step="打开授权页面...")
log.step("打开 S2A 授权页面...")
log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "S2A授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
progress_update(phase="授权", step="输入邮箱...")
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
except Exception as e:
log.warning(f"S2A 邮箱输入步骤异常: {e}")
log_current_url(page, "S2A-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
progress_update(phase="授权", step="输入密码...")
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入密码后点击继续")
except Exception as e:
log.warning(f"S2A 密码输入步骤异常: {e}")
log_current_url(page, "S2A-密码步骤完成后")
# 检查是否在授权确认页面,如果是则点击授权按钮
time.sleep(1)
current_url = page.url
if "/authorize" in current_url or "consent" in current_url:
try:
log.step("检测到授权确认页面,点击授权按钮...")
auth_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if auth_btn:
btn_text = auth_btn.text.lower() if auth_btn.text else ""
if 'authorize' in btn_text or '授权' in btn_text or 'allow' in btn_text or 'continue' in btn_text:
old_url = page.url
auth_btn.click()
time.sleep(2)
if page.url != old_url:
log_url_change(page, old_url, "S2A-点击授权按钮后")
except Exception as e:
log.warning(f"S2A 授权按钮点击异常: {e}")
# 等待授权回调 (S2A 使用 localhost 回调)
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
error_detected = False # 标记是否检测到错误
progress_update(phase="授权", step="等待回调...")
log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录 URL 变化
if current_url != last_url_in_loop:
log_current_url(page, "S2A等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
error_detected = False
# 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
if "localhost" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
progress_shown = False
callback_url = current_url
log.success(f"捕获 S2A 回调 URL")
break
# 检测 OpenAI 错误页面
if not error_detected:
try:
error_elem = page.ele('text:Something went wrong', timeout=0.3) or \
page.ele('text:出错了', timeout=0.3) or \
page.ele('text:Access denied', timeout=0.3) or \
page.ele('text:rate limit', timeout=0.3)
if error_elem:
log.warning(f"检测到错误页面: {error_elem.text[:50] if error_elem.text else 'unknown'}")
error_detected = True
# 尝试刷新页面
page.refresh()
time.sleep(3)
continue
except Exception:
pass
# 检测错误
check_and_handle_error(page)
# 检查是否需要点击 Authorize (只点击一次)
if not auth_btn_clicked:
try:
auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
if auth_btn:
btn_text = auth_btn.text.lower() if auth_btn.text else ""
btn_enabled = auth_btn.states.is_enabled if hasattr(auth_btn, 'states') else True
btn_displayed = auth_btn.states.is_displayed if hasattr(auth_btn, 'states') else True
if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text or 'allow' in btn_text:
if btn_enabled and btn_displayed:
log.step(f"点击授权按钮: '{auth_btn.text}'")
old_url = page.url
auth_btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(3) # 增加等待时间
new_url = page.url
if new_url != old_url:
log_url_change(page, old_url, "S2A点击授权按钮后")
else:
log.warning(f"点击授权按钮后 URL 未变化,当前: {new_url[:80]}...")
else:
log.warning(f"授权按钮不可用: enabled={btn_enabled}, displayed={btn_displayed}")
except Exception as e:
log.warning(f"检查授权按钮异常: {e}")
elapsed = int(time.time() - start_time)
log.progress_inline(f"[S2A等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"S2A检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("S2A 无法获取回调链接")
return False
# 从回调 URL 中提取 code
code = extract_code_from_url(callback_url)
if not code:
log.error("S2A 无法从回调链接提取授权码")
return False
# S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
progress_update(phase="授权", step="提交授权码...")
log.step("正在提交 S2A 授权码...")
result = s2a_create_account_from_oauth(code, session_id, name=email)
if result:
log.success("S2A 授权流程完成")
return True
else:
log.error("S2A 账号入库失败")
return False
except Exception as e:
log.error(f"S2A 授权异常: {e}")
return False
# ==================== 格式3专用: 登录获取 Session ====================
def login_and_get_session(page, email: str, password: str) -> dict:
"""登录 ChatGPT 并获取 accessToken 和 account_id (格式3专用)
用于 team.json 格式3 (只有邮箱和密码,没有 token) 的 Team Owner
登录后从 /api/auth/session 获取 token 和 account_id
Args:
page: 浏览器页面实例
email: 邮箱
password: 密码
Returns:
dict: {"token": "...", "account_id": "..."} 或 None
"""
log.info(f"登录获取 Session: {email}", icon="account")
try:
# 打开 ChatGPT 登录页
url = "https://chatgpt.com"
log.step(f"打开 {url}")
page.get(url)
wait_for_page_stable(page, timeout=8)
log_current_url(page, "登录页面加载完成", force=True)
# 检查是否已登录
if is_logged_in(page):
log.info("已登录,直接获取 Session...")
return _fetch_session_data(page)
# 点击登录按钮
log.step("点击登录...")
login_btn = wait_for_element(page, 'css:[data-testid="login-button"]', timeout=5)
if not login_btn:
login_btn = wait_for_element(page, 'text:登录', timeout=3)
if not login_btn:
login_btn = wait_for_element(page, 'text:Log in', timeout=3)
if login_btn:
old_url = page.url
login_btn.click()
# 等待页面变化
for _ in range(6):
time.sleep(0.5)
if page.url != old_url:
log_url_change(page, old_url, "点击登录按钮")
break
# 检测弹窗中的邮箱输入框
try:
email_input = page.ele('css:input[type="email"], input[name="email"]', timeout=1)
if email_input and email_input.states.is_displayed:
break
except Exception:
pass
current_url = page.url
log_current_url(page, "登录按钮点击后")
# 登录流程循环
max_steps = 10
for step in range(max_steps):
current_url = page.url
log_current_url(page, f"登录流程步骤 {step + 1}")
# 检查是否已登录成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
if is_logged_in(page):
log.success("登录成功")
# 检查并选择工作空间
_check_and_select_workspace(page)
time.sleep(1)
return _fetch_session_data(page)
# 步骤1: 输入邮箱
if "auth.openai.com/log-in-or-create-account" in current_url or \
("chatgpt.com" in current_url and "auth.openai.com" not in current_url):
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if email_input:
log.step("输入邮箱...")
human_delay()
type_slowly(page, 'css:input[type="email"]', email)
log.success("邮箱已输入")
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤2: 输入密码
if "/password" in current_url:
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if password_input:
# 检查是否已输入密码
try:
current_value = password_input.attr('value') or ''
if len(current_value) > 0:
log.info("密码已输入,点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
log.step("输入密码...")
human_delay()
type_slowly(page, 'css:input[type="password"]', password)
log.success("密码已输入")
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 处理错误
if check_and_handle_error(page):
time.sleep(0.5)
continue
# 检查是否出现工作空间选择页面
if _check_and_select_workspace(page):
# 选择工作空间后继续
time.sleep(1)
continue
time.sleep(0.5)
# 最终检查是否登录成功
if is_logged_in(page):
# 再次检查工作空间选择
_check_and_select_workspace(page)
time.sleep(1)
log.success("登录成功")
return _fetch_session_data(page)
log.error("登录流程未完成")
return None
except Exception as e:
log.error(f"登录失败: {e}")
return None
def _check_and_select_workspace(page) -> bool:
"""检查并选择工作空间
如果出现"启动工作空间"页面,点击第一个"打开"按钮
Returns:
bool: 是否处理了工作空间选择
"""
try:
# 检查是否有"启动工作空间"文字
workspace_text = page.ele('text:启动工作空间', timeout=2)
if not workspace_text:
workspace_text = page.ele('text:Launch workspace', timeout=1)
if not workspace_text:
return False
log.info("检测到工作空间选择页面")
# 直接点击第一个"打开"按钮
open_btn = page.ele('text:打开', timeout=2)
if not open_btn:
open_btn = page.ele('text:Open', timeout=1)
if open_btn:
log.step("选择第一个工作空间...")
open_btn.click()
# 等待页面加载完成
wait_for_page_stable(page, timeout=10)
# 检查是否进入了职业选择页面(说明工作空间选择成功)
if _is_job_selection_page(page):
log.success("已进入工作空间")
return True
log.warning("未找到打开按钮")
return False
except Exception as e:
log.warning(f"检查工作空间异常: {e}")
return False
def _is_job_selection_page(page) -> bool:
"""检查是否在职业选择页面
出现"你从事哪种工作?"说明工作空间选择成功
Returns:
bool: 是否在职业选择页面
"""
try:
job_text = page.ele('text:你从事哪种工作', timeout=2)
if not job_text:
job_text = page.ele('text:What kind of work do you do', timeout=1)
return bool(job_text)
except Exception:
return False
def _fetch_session_data(page) -> dict:
"""访问 session API 页面获取 token 和 account_id
Args:
page: 浏览器页面实例
Returns:
dict: {"token": "...", "account_id": "..."} 或 None
"""
try:
import json as json_module
# 直接访问 session API 页面
log.step("获取 Session 数据...")
page.get("https://chatgpt.com/api/auth/session")
time.sleep(1)
# 获取页面内容(JSON)
body = page.ele('tag:body', timeout=5)
if not body:
log.error("无法获取页面内容")
return None
text = body.text
if not text or text == '{}':
log.error("Session 数据为空")
return None
data = json_module.loads(text)
token = data.get('accessToken')
user = data.get('user', {})
account = data.get('account', {})
account_id = account.get('id') if account else None
if token:
log.success(f"获取 Session 成功: {user.get('email', 'unknown')}")
if account_id:
log.info(f" account_id: {account_id[:20]}...")
else:
log.warning(" account_id: 未获取到")
return {
"token": token,
"account_id": account_id or ""
}
else:
log.error("Session 中没有 token")
return None
except Exception as e:
log.error(f"获取 Session 失败: {e}")
return None
def login_and_authorize_team_owner(email: str, password: str, proxy: dict = None) -> dict:
"""格式3专用: 登录获取 token/account_id 并同时进行授权
Args:
email: 邮箱
password: 密码
proxy: 代理配置 (可选)
Returns:
dict: {
"success": True/False, # 授权是否成功
"token": "...",
"account_id": "...",
"authorized": True/False # 是否已授权
}
"""
from config import format_proxy_url
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
page = ctx.page
if proxy:
proxy_url = format_proxy_url(proxy)
if proxy_url:
log.info(f"使用代理: {proxy.get('host')}:{proxy.get('port')}")
# 步骤1: 登录获取 Session
session_data = login_and_get_session(page, email, password)
if not session_data:
if attempt < ctx.max_retries - 1:
log.warning("登录失败,准备重试...")
continue
return {"success": False}
token = session_data["token"]
account_id = session_data["account_id"]
# 步骤2: 进行授权
if AUTH_PROVIDER == "cpa":
success = perform_cpa_authorization(page, email, password)
return {
"success": success,
"token": token,
"account_id": account_id,
"authorized": success
}
else:
codex_data = perform_codex_authorization(page, email, password)
if codex_data:
from crs_service import crs_add_account
crs_result = crs_add_account(email, codex_data)
return {
"success": bool(crs_result),
"token": token,
"account_id": account_id,
"authorized": bool(crs_result),
"crs_id": crs_result.get("id") if crs_result else None
}
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return {
"success": False,
"token": token,
"account_id": account_id,
"authorized": False
}
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return {"success": False}
return {"success": False}