Files
codexTool/browser_automation.py

3659 lines
142 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== 浏览器自动化模块 ====================
# 处理 OpenAI 注册、Codex 授权等浏览器自动化操作
# 使用 DrissionPage 替代 Selenium
# 支持协议模式 (API) 和浏览器模式
import time
import random
import subprocess
import os
import platform
from contextlib import contextmanager
from DrissionPage import ChromiumPage, ChromiumOptions
from config import (
BROWSER_WAIT_TIMEOUT,
BROWSER_SHORT_WAIT,
BROWSER_HEADLESS,
BROWSER_RANDOM_FINGERPRINT,
AUTH_PROVIDER,
S2A_API_MODE,
get_random_name,
get_random_birthday,
get_random_fingerprint
)
from email_service import unified_get_verification_code
from crs_service import crs_generate_auth_url, crs_exchange_code, crs_add_account, extract_code_from_url
from cpa_service import (
cpa_generate_auth_url,
cpa_submit_callback,
cpa_poll_auth_status,
is_cpa_callback_url
)
from s2a_service import (
s2a_generate_auth_url,
s2a_create_account_from_oauth,
s2a_api_authorize,
)
from logger import log
# 导入协议模式模块
try:
from api_register import (
api_register_account_only,
is_api_mode_available as _is_api_mode_available,
)
API_MODE_AVAILABLE = _is_api_mode_available()
except ImportError:
API_MODE_AVAILABLE = False
api_register_account_only = None
# 进度更新 (Telegram Bot 使用)
try:
from bot_notifier import progress_update
except ImportError:
def progress_update(account=None, phase=None, step=None, role=None): pass
# ==================== 停止检查 ====================
class ShutdownRequested(Exception):
"""停止请求异常 - 用于中断浏览器操作"""
pass
def check_shutdown():
"""检查是否收到停止请求,如果是则抛出异常"""
try:
import run
if run._shutdown_requested:
log.warning("检测到停止请求,中断当前操作...")
raise ShutdownRequested("用户请求停止")
except ImportError:
pass
except ShutdownRequested:
raise
except Exception:
pass
# ==================== 浏览器配置常量 ====================
BROWSER_MAX_RETRIES = 3 # 浏览器启动最大重试次数
BROWSER_RETRY_DELAY = 2 # 重试间隔 (秒)
PAGE_LOAD_TIMEOUT = 15 # 页面加载超时 (秒)
# ==================== 输入速度配置 (模拟真人) ====================
# 设置为 True 使用更安全的慢速模式False 使用快速模式
SAFE_MODE = True
TYPING_DELAY = 0.12 if SAFE_MODE else 0.06 # 打字基础延迟
ACTION_DELAY = (1.0, 2.0) if SAFE_MODE else (0.3, 0.8) # 操作间隔范围
# ==================== URL 监听与日志 ====================
_last_logged_url = None # 记录上次日志的URL避免重复
# ==================== 调试截图 ====================
DEBUG_SCREENSHOT = os.environ.get('DEBUG_SCREENSHOT', '').lower() == 'true'
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), 'debug_screenshots')
def save_debug_screenshot(page, name: str):
"""保存调试截图并发送到 Telegram (仅在 DEBUG_SCREENSHOT=true 时生效)
Args:
page: 浏览器页面对象
name: 截图名称 (不含扩展名)
"""
if not DEBUG_SCREENSHOT:
return
try:
if not os.path.exists(SCREENSHOT_DIR):
os.makedirs(SCREENSHOT_DIR)
timestamp = time.strftime('%Y%m%d_%H%M%S')
filepath = os.path.join(SCREENSHOT_DIR, f'{timestamp}_{name}.png')
page.get_screenshot(filepath)
log.info(f"[DEBUG] 截图已保存: {filepath}")
# 发送截图到 Telegram Bot
try:
from bot_notifier import send_screenshot_sync
caption = f"🔍 <b>Debug Screenshot</b>\n<code>{name}</code>"
send_screenshot_sync(filepath, caption)
except Exception:
pass # Bot 未启动时忽略
except Exception as e:
log.warning(f"截图保存失败: {e}")
def log_current_url(page, context: str = None, force: bool = False):
"""记录当前页面URL (完整地址)
Args:
page: 浏览器页面对象
context: 上下文描述 (如 "点击继续后", "输入邮箱后")
force: 是否强制记录 (即使URL未变化)
"""
global _last_logged_url
try:
current_url = page.url
# 只在URL变化时记录除非强制记录
if force or current_url != _last_logged_url:
_last_logged_url = current_url
# 解析URL获取关键信息
url_info = _parse_url_info(current_url)
# 左对齐格式输出
if context:
if url_info:
log.info(f"[URL] {context} | {current_url} | {url_info}")
else:
log.info(f"[URL] {context} | {current_url}")
else:
if url_info:
log.info(f"[URL] {current_url} | {url_info}")
else:
log.info(f"[URL] {current_url}")
except Exception as e:
log.warning(f"获取URL失败: {e}")
def _parse_url_info(url: str) -> str:
"""解析URL返回页面类型描述
Args:
url: 页面URL
Returns:
str: 页面类型描述
"""
if not url:
return ""
# OpenAI Auth 页面
if "auth.openai.com" in url:
if "/log-in-or-create-account" in url:
return "登录/注册选择页"
elif "/log-in/password" in url:
return "密码登录页"
elif "/create-account/password" in url:
return "创建账号密码页"
elif "/email-verification" in url:
return "邮箱验证码页"
elif "/about-you" in url:
return "个人信息填写页"
elif "/authorize" in url:
return "授权确认页"
elif "/callback" in url:
return "回调处理页"
else:
return "OpenAI 认证页"
# ChatGPT 页面
elif "chatgpt.com" in url:
if "/auth" in url:
return "ChatGPT 认证页"
else:
return "ChatGPT 主页"
# 回调页面
elif "localhost:1455" in url:
if "/auth/callback" in url:
return "本地授权回调页"
else:
return "本地服务页"
return ""
def log_url_change(page, old_url: str, action: str = None):
"""记录URL变化 (显示完整地址,左对齐)
Args:
page: 浏览器页面对象
old_url: 变化前的URL
action: 触发变化的操作描述
"""
global _last_logged_url
try:
new_url = page.url
if new_url != old_url:
_last_logged_url = new_url # 更新记录,避免重复日志
new_info = _parse_url_info(new_url)
# 左对齐格式: [URL] 操作 | 新地址 | 页面类型
if action:
if new_info:
log.info(f"[URL] {action} | {new_url} | {new_info}")
else:
log.info(f"[URL] {action} | {new_url}")
else:
if new_info:
log.info(f"[URL] 跳转 | {new_url} | {new_info}")
else:
log.info(f"[URL] 跳转 | {new_url}")
except Exception as e:
log.warning(f"记录URL变化失败: {e}")
def acquire_lock_with_keepalive(lock, page, timeout: float = 120, check_interval: float = 1.0) -> bool:
"""获取锁的同时保持浏览器活跃,防止等待期间浏览器连接断开
在并发模式下,多个 Worker 可能需要等待授权回调锁。
如果等待时间过长 (>15-30秒),浏览器连接可能会因为空闲而断开。
此函数通过定期访问页面来保持浏览器活跃。
Args:
lock: threading.Lock 对象
page: 浏览器页面对象
timeout: 最大等待时间 (秒)
check_interval: 检查间隔 (秒),默认 1 秒
Returns:
bool: 是否成功获取锁(且浏览器仍可用)
"""
import threading
start_time = time.time()
keepalive_counter = 0
while time.time() - start_time < timeout:
# 尝试非阻塞获取锁
acquired = lock.acquire(blocking=False)
if acquired:
# 获取锁成功,验证浏览器是否仍然可用
try:
_ = page.url # 测试连接
page.run_js("1+1") # 确保 JS 引擎可用
return True
except Exception as e:
# 浏览器已断开,释放锁让其他人重试
log.warning(f"获取锁后发现浏览器已断开: {e}")
lock.release()
return False
# 未获取到锁,保持浏览器活跃
try:
# 每次都访问页面属性以保持连接活跃
current_url = page.url
# 执行轻量级 JavaScript 保持活跃
page.run_js("1+1")
# 每 5 次(约 5 秒)执行一次更积极的保活操作
keepalive_counter += 1
if keepalive_counter % 5 == 0:
# 导航到 about:blank 再返回,强制保持连接
try:
page.run_js("window.focus()") # 尝试激活窗口
except Exception:
pass
except Exception as e:
# 浏览器连接已断开
log.warning(f"保持浏览器活跃时出错: {e}")
return False
# 等待一小段时间后重试(使用较短间隔)
time.sleep(check_interval)
# 超时,尝试最后一次阻塞获取 (短暂)
acquired = lock.acquire(blocking=True, timeout=0.1)
if acquired:
# 验证浏览器
try:
_ = page.url
return True
except Exception:
lock.release()
return False
return False
def cleanup_chrome_processes(force: bool = False):
"""清理残留的 Chrome 进程 (跨平台支持)
Args:
force: 是否强制清理。在并发模式下默认跳过,除非 force=True
"""
# 检查是否在并发模式下运行,如果是则跳过清理(避免杀死其他 Worker 的浏览器)
if not force:
try:
from config import CONCURRENT_ENABLED
if CONCURRENT_ENABLED:
# 并发模式下跳过全局清理,避免影响其他 Worker
return
except ImportError:
pass
try:
if platform.system() == "Windows":
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
try:
result = subprocess.run(
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
pid = line.strip()
if pid.isdigit():
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
else:
# Linux/Mac: 使用 pkill
try:
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程
try:
subprocess.run(
['pkill', '-f', 'chrome.*--headless'],
capture_output=True, timeout=5
)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
except Exception:
pass # 静默处理,不影响主流程
def _inject_fingerprint(page: ChromiumPage, fingerprint: dict):
"""注入浏览器指纹伪装脚本
Args:
page: 浏览器页面对象
fingerprint: 指纹配置字典
"""
try:
webgl_vendor = fingerprint.get("webgl_vendor", "Google Inc. (NVIDIA)")
webgl_renderer = fingerprint.get("webgl_renderer", "ANGLE (NVIDIA)")
platform = fingerprint.get("platform", "Win32")
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
# 注入指纹伪装脚本 (使用 try-catch 避免属性不可重定义的错误)
js_script = f'''
(function() {{
// 伪装 WebGL 指纹
try {{
const getParameterProxyHandler = {{
apply: function(target, thisArg, args) {{
const param = args[0];
if (param === 37445) return "{webgl_vendor}";
if (param === 37446) return "{webgl_renderer}";
return Reflect.apply(target, thisArg, args);
}}
}};
const originalGetParameter = WebGLRenderingContext.prototype.getParameter;
WebGLRenderingContext.prototype.getParameter = new Proxy(originalGetParameter, getParameterProxyHandler);
if (typeof WebGL2RenderingContext !== 'undefined') {{
const originalGetParameter2 = WebGL2RenderingContext.prototype.getParameter;
WebGL2RenderingContext.prototype.getParameter = new Proxy(originalGetParameter2, getParameterProxyHandler);
}}
}} catch(e) {{}}
// 伪装 platform
try {{
Object.defineProperty(navigator, 'platform', {{ get: () => "{platform}", configurable: true }});
}} catch(e) {{}}
// 伪装屏幕分辨率
try {{
Object.defineProperty(screen, 'width', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'height', {{ get: () => {screen["height"]}, configurable: true }});
Object.defineProperty(screen, 'availWidth', {{ get: () => {screen["width"]}, configurable: true }});
Object.defineProperty(screen, 'availHeight', {{ get: () => {screen["height"]}, configurable: true }});
}} catch(e) {{}}
// 隐藏 webdriver 特征
try {{
Object.defineProperty(navigator, 'webdriver', {{ get: () => undefined, configurable: true }});
}} catch(e) {{}}
// 伪装 languages
try {{
Object.defineProperty(navigator, 'languages', {{ get: () => ["zh-CN", "zh", "en-US", "en"], configurable: true }});
}} catch(e) {{}}
// 伪装 plugins
try {{
Object.defineProperty(navigator, 'plugins', {{
get: () => [
{{ name: "Chrome PDF Plugin", filename: "internal-pdf-viewer" }},
{{ name: "Chrome PDF Viewer", filename: "mhjfbmdgcfjbbpaeojofohoefgiehjai" }},
{{ name: "Native Client", filename: "internal-nacl-plugin" }}
],
configurable: true
}});
}} catch(e) {{}}
}})();
'''
page.run_js(js_script)
except Exception as e:
log.warning(f"指纹注入失败: {e}")
def init_browser(max_retries: int = BROWSER_MAX_RETRIES) -> ChromiumPage:
"""初始化 DrissionPage 浏览器 (带重试机制)
Args:
max_retries: 最大重试次数
Returns:
ChromiumPage: 浏览器实例
"""
log.info("初始化浏览器...", icon="browser")
# 根据配置决定是否使用随机指纹
fingerprint = None
if BROWSER_RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
log.info(f"指纹: {fingerprint['webgl_renderer'][:45]}... | {fingerprint['screen']['width']}x{fingerprint['screen']['height']}", icon="config")
else:
# 使用默认指纹
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
}
log.info("指纹: 默认 (RTX 3060, 1920x1080)", icon="config")
last_error = None
is_linux = platform.system() == "Linux"
for attempt in range(max_retries):
try:
# 首次尝试或重试前清理残留进程
if attempt > 0:
log.warning(f"浏览器启动重试 ({attempt + 1}/{max_retries})...")
cleanup_chrome_processes()
time.sleep(BROWSER_RETRY_DELAY)
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--disable-infobars')
co.set_argument('--incognito') # 无痕模式
co.set_argument('--disable-gpu') # 减少资源占用
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
co.set_argument('--no-sandbox') # 服务器环境需要
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
# 并发模式:为每个实例创建独立的用户数据目录
import tempfile
temp_user_data = tempfile.mkdtemp(prefix="chrome_worker_")
co.set_argument(f'--user-data-dir={temp_user_data}')
# 设置 User-Agent
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
# 设置语言为中文简体
co.set_argument('--lang=zh-CN')
# Linux 服务器特殊配置
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
# 注意: 不使用 --single-process因为并发模式下会导致实例冲突
co.set_argument('--remote-debugging-port=0') # 让系统自动分配端口
# 尝试查找 Chrome/Chromium 路径
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log.step(f"使用浏览器: {chrome_path}")
break
else:
co.auto_port() # Windows 使用自动分配端口
# 无头模式 (服务器运行)
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
if BROWSER_HEADLESS:
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
log.step("启动 Chrome (无头模式)...")
else:
log.step("启动 Chrome (无痕模式)...")
# 设置超时
co.set_timeouts(base=PAGE_LOAD_TIMEOUT, page_load=PAGE_LOAD_TIMEOUT * 2)
page = ChromiumPage(co)
# 存储临时目录路径,用于后续清理
page._temp_user_data_dir = temp_user_data
# 注入指纹伪装脚本 (仅在启用随机指纹时)
if BROWSER_RANDOM_FINGERPRINT:
_inject_fingerprint(page, fingerprint)
log.success("浏览器启动成功")
return page
except Exception as e:
last_error = e
log.warning(f"浏览器启动失败 (尝试 {attempt + 1}/{max_retries}): {e}")
# 清理可能的残留
cleanup_chrome_processes()
# 所有重试都失败
log.error(f"浏览器启动失败,已重试 {max_retries} 次: {last_error}")
raise last_error
@contextmanager
def browser_context(max_retries: int = BROWSER_MAX_RETRIES):
"""浏览器上下文管理器 - 自动管理浏览器生命周期
使用示例:
with browser_context() as page:
page.get("https://example.com")
# 做一些操作...
# 浏览器会自动关闭
Args:
max_retries: 浏览器启动最大重试次数
Yields:
ChromiumPage: 浏览器页面实例
"""
page = None
temp_dir = None
try:
page = init_browser(max_retries)
# 获取临时目录路径
temp_dir = getattr(page, '_temp_user_data_dir', None)
yield page
finally:
if page:
log.step("关闭浏览器...")
try:
page.quit()
except Exception as e:
log.warning(f"浏览器关闭异常: {e}")
finally:
# 确保清理残留进程
cleanup_chrome_processes()
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
@contextmanager
def browser_context_with_retry(max_browser_retries: int = 2):
"""带重试机制的浏览器上下文管理器
在整体流程失败时自动重试,适用于注册/授权等复杂流程
使用示例:
with browser_context_with_retry() as ctx:
for attempt in ctx.attempts():
try:
page = ctx.page
# 做一些操作...
break # 成功则退出
except Exception as e:
ctx.handle_error(e)
Args:
max_browser_retries: 最大重试次数
Yields:
BrowserRetryContext: 重试上下文对象
"""
ctx = BrowserRetryContext(max_browser_retries)
try:
yield ctx
finally:
ctx.cleanup()
class BrowserRetryContext:
"""浏览器重试上下文"""
def __init__(self, max_retries: int = 2):
self.max_retries = max_retries
self.current_attempt = 0
self.page = None
self._should_continue = True
def attempts(self):
"""生成重试迭代器"""
for attempt in range(self.max_retries):
if not self._should_continue:
break
# 检查停止请求
check_shutdown()
self.current_attempt = attempt
# 非首次尝试时的清理和等待
if attempt > 0:
log.warning(f"重试整体流程 ({attempt + 1}/{self.max_retries})...")
self._cleanup_page()
cleanup_chrome_processes()
time.sleep(2)
# 初始化浏览器
try:
check_shutdown() # 再次检查
self.page = init_browser()
yield attempt
except ShutdownRequested:
self._should_continue = False
raise
except Exception as e:
log.error(f"浏览器初始化失败: {e}")
if attempt >= self.max_retries - 1:
raise
def handle_error(self, error: Exception):
"""处理错误,决定是否继续重试"""
# 如果是停止请求,直接停止
if isinstance(error, ShutdownRequested):
self._should_continue = False
return
log.error(f"流程异常: {error}")
if self.current_attempt >= self.max_retries - 1:
self._should_continue = False
else:
log.warning("准备重试...")
def stop(self):
"""停止重试"""
self._should_continue = False
def _cleanup_page(self):
"""清理当前页面"""
if self.page:
# 获取临时目录路径
temp_dir = getattr(self.page, '_temp_user_data_dir', None)
try:
self.page.quit()
except Exception:
pass
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
self.page = None
def cleanup(self):
"""最终清理"""
if self.page:
log.step("关闭浏览器...")
# 获取临时目录路径
temp_dir = getattr(self.page, '_temp_user_data_dir', None)
try:
self.page.quit()
except Exception:
pass
# 清理临时用户数据目录
if temp_dir and os.path.exists(temp_dir):
try:
import shutil
shutil.rmtree(temp_dir, ignore_errors=True)
except Exception:
pass
self.page = None
def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -> bool:
"""等待页面稳定 (页面加载完成且 DOM 不再变化)
Args:
page: 浏览器页面对象
timeout: 超时时间 (秒)
check_interval: 检查间隔 (秒)
Returns:
bool: 是否稳定
"""
start_time = time.time()
last_html_len = 0
stable_count = 0
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
# 检查浏览器标签页是否还在加载favicon 旋转动画)
ready_state = page.run_js('return document.readyState', timeout=2)
if ready_state != 'complete':
stable_count = 0
time.sleep(check_interval)
continue
current_len = len(page.html)
if current_len == last_html_len:
stable_count += 1
if stable_count >= 3: # 连续 3 次检查都稳定
return True
else:
stable_count = 0
last_html_len = current_len
time.sleep(check_interval)
except ShutdownRequested:
raise
except Exception:
time.sleep(check_interval)
return False
def check_and_handle_error_page(page, max_retries: int = 2) -> bool:
"""检测并处理错误页面(如 Operation timed out
Args:
page: 浏览器页面对象
max_retries: 最大重试次数
Returns:
bool: 是否成功处理(页面恢复正常)
"""
for attempt in range(max_retries):
# 检测错误页面
error_text = page.ele('text:糟糕,出错了', timeout=1) or \
page.ele('text:Something went wrong', timeout=1) or \
page.ele('text:Operation timed out', timeout=1)
if not error_text:
return True # 没有错误,正常
log.warning(f"检测到错误页面,尝试重试 ({attempt + 1}/{max_retries})...")
# 点击重试按钮
retry_btn = page.ele('text:重试', timeout=2) or page.ele('text:Retry', timeout=1)
if retry_btn:
retry_btn.click()
time.sleep(3)
wait_for_page_stable(page, timeout=8)
else:
# 没有重试按钮,刷新页面
page.refresh()
time.sleep(3)
wait_for_page_stable(page, timeout=8)
# 最后再检查一次
error_text = page.ele('text:糟糕,出错了', timeout=1) or page.ele('text:Something went wrong', timeout=1)
return error_text is None
def wait_for_element(page, selector: str, timeout: int = 10, visible: bool = True):
"""智能等待元素出现
Args:
page: 浏览器页面对象
selector: CSS 选择器
timeout: 超时时间 (秒)
visible: 是否要求元素可见
Returns:
元素对象或 None
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
element = page.ele(selector, timeout=1)
if element:
if not visible or (element.states.is_displayed if hasattr(element, 'states') else True):
return element
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.3)
return None
def wait_for_url_change(page, old_url: str, timeout: int = 15, contains: str = None) -> bool:
"""等待 URL 变化
Args:
page: 浏览器页面对象
old_url: 原始 URL
timeout: 超时时间 (秒)
contains: 新 URL 需要包含的字符串 (可选)
Returns:
bool: URL 是否已变化
"""
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
current_url = page.url
if current_url != old_url:
if contains is None or contains in current_url:
return True
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.5)
return False
def type_slowly(page, selector_or_element, text, base_delay=None):
"""缓慢输入文本 (模拟真人输入)
Args:
page: 浏览器页面对象 (用于重新获取元素)
selector_or_element: CSS 选择器字符串或元素对象
text: 要输入的文本
base_delay: 基础延迟 (秒),默认使用 TYPING_DELAY
"""
if base_delay is None:
base_delay = TYPING_DELAY
# 获取元素 (如果传入的是选择器则查找,否则直接使用)
if isinstance(selector_or_element, str):
element = page.ele(selector_or_element, timeout=10)
else:
element = selector_or_element
if not text:
return
# 对于短文本(如验证码),直接一次性输入,速度更快
if len(text) <= 8:
element.input(text, clear=True)
return
# 长文本使用逐字符输入
element.input(text[0], clear=True)
time.sleep(random.uniform(0.1, 0.2))
# 逐个输入剩余字符,不重新获取元素
for char in text[1:]:
element.input(char, clear=False)
# 随机延迟
actual_delay = base_delay * random.uniform(0.5, 1.2)
if char in ' @._-':
actual_delay *= 1.3
time.sleep(actual_delay)
def human_delay(min_sec: float = None, max_sec: float = None):
"""模拟人类操作间隔
Args:
min_sec: 最小延迟 (秒),默认使用 ACTION_DELAY[0]
max_sec: 最大延迟 (秒),默认使用 ACTION_DELAY[1]
"""
if min_sec is None:
min_sec = ACTION_DELAY[0]
if max_sec is None:
max_sec = ACTION_DELAY[1]
time.sleep(random.uniform(min_sec, max_sec))
def _detect_page_language(page) -> str:
"""检测页面语言
Returns:
str: 'zh' 中文, 'en' 英文
"""
try:
# 检查页面是否包含中文特征
html = page.html
# 检查中文关键词
chinese_keywords = ['确认', '继续', '登录', '注册', '验证', '邮箱', '密码', '姓名', '生日']
for keyword in chinese_keywords:
if keyword in html:
return 'zh'
return 'en'
except Exception:
return 'en' # 默认英文
def _input_birthday(page, birthday: dict):
"""智能输入生日 (自动适配中英文界面)
中文界面: yyyy/mm/dd (年/月/日)
英文界面: mm/dd/yyyy (月/日/年)
Args:
page: 浏览器页面对象
birthday: {'year': '2000', 'month': '01', 'day': '15'}
"""
# 获取三个输入框
year_input = wait_for_element(page, 'css:[data-type="year"]', timeout=10)
month_input = wait_for_element(page, 'css:[data-type="month"]', timeout=5)
day_input = wait_for_element(page, 'css:[data-type="day"]', timeout=5)
if not all([year_input, month_input, day_input]):
log.warning("未找到完整的生日输入框")
return False
# 检测页面语言
lang = _detect_page_language(page)
log.step(f"输入生日: {birthday['year']}/{birthday['month']}/{birthday['day']} (界面语言: {lang})")
# 根据语言决定输入顺序
if lang == 'zh':
# 中文: 年 -> 月 -> 日
input_order = [(year_input, birthday['year']),
(month_input, birthday['month']),
(day_input, birthday['day'])]
else:
# 英文: 月 -> 日 -> 年
input_order = [(month_input, birthday['month']),
(day_input, birthday['day']),
(year_input, birthday['year'])]
# 按顺序输入
for input_elem, value in input_order:
try:
input_elem.click()
time.sleep(0.15)
input_elem.input(value, clear=True)
time.sleep(0.2)
except Exception as e:
log.warning(f"生日输入异常: {e}")
log.success("生日已输入")
return True
def check_and_handle_error(page, max_retries=5) -> bool:
"""检查并处理页面错误 (带自动重试)"""
for attempt in range(max_retries):
try:
page_source = page.html.lower()
error_keywords = ['出错', 'error', 'timed out', 'operation timeout', 'route error', 'invalid content']
has_error = any(keyword in page_source for keyword in error_keywords)
if has_error:
try:
retry_btn = page.ele('css:button[data-dd-action-name="Try again"]', timeout=2)
if retry_btn:
log.warning(f"检测到错误页面,点击重试 ({attempt + 1}/{max_retries})...")
retry_btn.click()
wait_time = 3 + attempt # 递增等待,但减少基础时间
time.sleep(wait_time)
return True
except Exception:
time.sleep(1)
continue
return False
except Exception:
return False
return False
def retry_on_page_refresh(func):
"""装饰器: 页面刷新时自动重试"""
def wrapper(*args, **kwargs):
max_retries = 3
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
error_msg = str(e).lower()
if '页面被刷新' in error_msg or 'page refresh' in error_msg or 'stale' in error_msg:
if attempt < max_retries - 1:
log.warning(f"页面刷新,重试操作 ({attempt + 1}/{max_retries})...")
time.sleep(1)
continue
raise
return None
return wrapper
def is_logged_in(page, timeout: int = 5) -> bool:
"""检测是否已登录 ChatGPT (通过 API 请求判断)
通过请求 /api/auth/session 接口判断:
- 已登录: 返回包含 user 字段的 JSON
- 未登录: 返回 {}
"""
try:
# 使用 JavaScript 请求 session API设置超时
result = page.run_js(f'''
return Promise.race([
fetch('/api/auth/session', {{
method: 'GET',
credentials: 'include'
}})
.then(r => r.json())
.then(data => JSON.stringify(data))
.catch(e => '{{}}'),
new Promise((_, reject) => setTimeout(() => reject('timeout'), {timeout * 1000}))
]).catch(() => '{{}}');
''', timeout=timeout + 2)
if result and result != '{}':
import json
data = json.loads(result)
if data.get('user') and data.get('accessToken'):
log.success(f"已登录: {data['user'].get('email', 'unknown')}")
return True
return False
except Exception as e:
log.warning(f"登录检测异常: {e}")
return False
def register_openai_account_api(email: str, password: str, proxy: str = None,
team_name: str = None, max_email_retries: int = 3) -> bool:
"""使用协议模式 (API) 注册 OpenAI 账号
如果验证码获取超时,会自动创建新邮箱重试(不进入浏览器模式)
Args:
email: 邮箱地址
password: 密码
proxy: 代理地址 (可选)
team_name: Team 名称 (用于邀请新邮箱)
max_email_retries: 验证码超时后最大重试次数 (创建新邮箱)
Returns:
bool: 是否成功
str: 如果返回 "new_email:xxx@xxx.com:password",表示使用了新邮箱
"""
if not API_MODE_AVAILABLE:
log.warning("协议模式不可用,回退到浏览器模式")
return None # 返回 None 表示需要回退
current_email = email
current_password = password
for retry in range(max_email_retries):
if retry > 0:
log.warning(f"验证码超时,尝试创建新邮箱 (重试 {retry}/{max_email_retries - 1})...")
# 创建新邮箱
from email_service import unified_create_email
new_email, new_password = unified_create_email()
if not new_email:
log.error("创建新邮箱失败")
continue
# 如果有 team_name邀请新邮箱到 Team
if team_name:
from team_service import invite_single_to_team
from config import TEAMS
# 查找 team 配置
team = None
for t in TEAMS:
if t.get("name") == team_name:
team = t
break
if team:
log.step(f"邀请新邮箱到 Team: {new_email}")
if not invite_single_to_team(new_email, team):
log.error("新邮箱邀请失败")
continue
log.success(f"新邮箱邀请成功: {new_email}")
current_email = new_email
current_password = new_password
log.info(f"[API模式] 开始注册 OpenAI 账号: {current_email}", icon="account")
# 生成随机姓名和生日
random_name = get_random_name()
birthday = get_random_birthday()
birthdate = f"{birthday['year']}-{birthday['month']}-{birthday['day']}"
log.step(f"姓名: {random_name}, 生日: {birthdate}")
# 验证码超时标志
verification_timeout = False
# 定义获取验证码的函数 (5秒超时)
def get_code(target_email):
nonlocal verification_timeout
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件 (5秒超时)...")
# 使用较短的超时时间: 5 次快速重试,每次 1 秒
code, error, email_time = unified_get_verification_code(
target_email,
max_retries=5, # 5 次重试
interval=1 # 每次间隔 1 秒
)
if code:
log.success(f"获取到验证码: {code}")
return code
else:
verification_timeout = True
log.warning("验证码获取超时 (5秒)")
return None
# 执行 API 注册
try:
result = api_register_account_only(
email=current_email,
password=current_password,
real_name=random_name,
birthdate=birthdate,
get_verification_code_func=get_code,
proxy=proxy,
progress_callback=lambda msg: log.step(msg)
)
if result:
log.success(f"[API模式] 注册完成: {current_email}")
# 如果使用了新邮箱,返回特殊标记
if current_email != email:
return f"new_email:{current_email}:{current_password}"
return True
elif verification_timeout:
# 验证码超时,继续下一次重试(创建新邮箱)
log.warning("[API模式] 验证码超时,将创建新邮箱重试...")
continue
else:
log.warning("[API模式] 注册失败")
return False
except Exception as e:
log.error(f"[API模式] 注册异常: {e}")
if "验证码" in str(e) or "timeout" in str(e).lower():
# 验证码相关异常,继续重试
continue
return False
log.error(f"[API模式] 已重试 {max_email_retries} 次,全部失败")
return False
def register_openai_account_auto(page, email: str, password: str, use_api: bool = True,
proxy: str = None, team_name: str = None,
allow_fallback: bool = False) -> bool:
"""自动选择模式注册 OpenAI 账号
Args:
page: 浏览器实例 (用于浏览器模式回退)
email: 邮箱地址
password: 密码
use_api: 是否优先使用 API 模式
proxy: 代理地址 (API 模式使用)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
allow_fallback: 是否允许 API 失败后回退到浏览器模式 (默认 False)
Returns:
bool/str:
- True: 成功
- False: 失败
- "retry_new_email": API 失败,需要重新生成邮箱重试
- "new_email:xxx@xxx.com:password": 使用了新邮箱
- "domain_blacklisted": 域名被列入黑名单
"""
# 如果启用 API 模式且可用
if use_api and API_MODE_AVAILABLE:
result = register_openai_account_api(email, password, proxy, team_name)
if result is True:
return True
elif isinstance(result, str) and result.startswith("new_email:"):
# 使用了新邮箱,返回新邮箱信息
return result
elif result == "domain_blacklisted":
return "domain_blacklisted"
elif result is False:
if not allow_fallback:
log.warning("API 模式注册失败,需要重新生成邮箱重试")
return "retry_new_email"
log.warning("API 模式注册失败,回退到浏览器模式...")
# result is None 表示 API 模式不可用
elif result is None and not allow_fallback:
log.warning("API 模式不可用,需要重新生成邮箱重试")
return "retry_new_email"
# 如果不使用 API 或允许回退,使用浏览器模式
if not use_api or allow_fallback:
return register_openai_account(page, email, password)
return "retry_new_email"
def register_openai_account(page, email: str, password: str) -> bool:
"""使用浏览器注册 OpenAI 账号
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 是否成功
"""
log.info(f"开始注册 OpenAI 账号: {email}", icon="account")
try:
# 打开注册页面
url = "https://chatgpt.com"
log.step(f"打开 {url}")
page.get(url)
# 智能等待页面加载完成
wait_for_page_stable(page, timeout=8)
log_current_url(page, "页面加载完成", force=True)
# 检查页面是否正常加载
current_url = page.url
# 如果已经在 auth.openai.com说明页面正常直接继续
if "auth.openai.com" in current_url:
log.info("已跳转到认证页面")
else:
# 在 chatgpt.com检查是否有注册按钮
page_ok = page.ele('css:[data-testid="signup-button"]', timeout=1) or \
page.ele('text:免费注册', timeout=1) or \
page.ele('text:Sign up for free', timeout=1) or \
page.ele('text:登录', timeout=1) or \
page.ele('text:Log in', timeout=1)
if not page_ok:
log.warning("页面加载异常3秒后刷新...")
save_debug_screenshot(page, 'page_load_abnormal')
time.sleep(3)
page.refresh()
wait_for_page_stable(page, timeout=8)
log_current_url(page, "刷新后", force=True)
# 检测是否已登录 (通过 API 判断)
try:
if is_logged_in(page):
log.success("检测到已登录,跳过注册步骤")
return True
except Exception:
pass # 忽略登录检测异常,继续注册流程
# 点击"免费注册"按钮
log.step("点击免费注册...")
signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=5)
if not signup_btn:
signup_btn = wait_for_element(page, 'text:免费注册', timeout=3)
if not signup_btn:
signup_btn = wait_for_element(page, 'text:Sign up for free', timeout=3)
if signup_btn:
old_url = page.url
signup_btn.click()
# 等待 URL 变化或弹窗/输入框出现 (最多3秒快速检测)
for _ in range(6):
time.sleep(0.5)
if page.url != old_url:
log_url_change(page, old_url, "点击注册按钮")
break
# 检测弹窗中的邮箱输入框
try:
email_input = page.ele('css:input[type="email"], input[name="email"]', timeout=1)
if email_input and email_input.states.is_displayed:
break
except Exception:
pass
current_url = page.url
log_current_url(page, "注册按钮点击后")
# 如果没有跳转到 auth.openai.com检查是否在 chatgpt.com 弹窗中
if "auth.openai.com" not in current_url and "chatgpt.com" in current_url:
log.step("尝试在当前弹窗中输入邮箱...")
# 等待弹窗完全加载 (增加等待时间)
time.sleep(2)
# 快速检查弹窗是否正常加载(包含登录表单)- 增加更多选择器
login_form = wait_for_element(page, 'css:[data-testid="login-form"]', timeout=3)
if not login_form:
login_form = page.ele('text:登录或注册', timeout=2) or \
page.ele('text:Log in or sign up', timeout=2) or \
page.ele('text:Welcome back', timeout=1) or \
page.ele('text:欢迎回来', timeout=1) or \
page.ele('css:input[type="email"]', timeout=2) or \
page.ele('css:input[name="email"]', timeout=1)
if not login_form:
# 弹窗内容异常,关闭并刷新页面重试
log.warning("弹窗内容异常,刷新页面重试...")
save_debug_screenshot(page, 'popup_abnormal')
close_btn = page.ele('css:button[aria-label="Close"], button[aria-label="关闭"]', timeout=1)
if not close_btn:
close_btn = page.ele('css:button:has(svg)', timeout=1)
if close_btn:
close_btn.click()
time.sleep(0.5)
# 刷新页面
page.refresh()
wait_for_page_stable(page, timeout=8)
log_current_url(page, "刷新后", force=True)
# 重新点击注册按钮
log.step("重新点击免费注册...")
signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=8) or \
wait_for_element(page, 'text:免费注册', timeout=5) or \
wait_for_element(page, 'text:Sign up for free', timeout=3) or \
wait_for_element(page, 'text:Get started', timeout=2)
if signup_btn:
signup_btn.click()
time.sleep(3) # 增加等待时间
# 再次检查弹窗
login_form = page.ele('css:[data-testid="login-form"]', timeout=5) or \
page.ele('text:登录或注册', timeout=3) or \
page.ele('css:input[type="email"]', timeout=3)
if not login_form:
log.error("重试后弹窗仍然异常,跳过此账号")
save_debug_screenshot(page, 'popup_retry_failed')
return False
else:
log.error("找不到注册按钮,跳过此账号")
save_debug_screenshot(page, 'signup_button_not_found')
return False
# 尝试输入邮箱
email_input = wait_for_element(page, 'css:input[type="email"], input[name="email"], input[id="email"]', timeout=5)
if email_input:
human_delay()
type_slowly(page, 'css:input[type="email"], input[name="email"], input[id="email"]', email)
log.success("邮箱已输入")
# 点击继续
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10, contains="/password")
# === 使用循环处理整个注册流程 ===
max_steps = 10 # 防止无限循环
for step in range(max_steps):
current_url = page.url
log_current_url(page, f"注册流程步骤 {step + 1}")
# 如果在 chatgpt.com 且已登录,注册成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
try:
if is_logged_in(page):
log.success("检测到已登录,账号已注册成功")
return True
except Exception:
pass
# 步骤1: 输入邮箱 (在 log-in-or-create-account 页面)
if "auth.openai.com/log-in-or-create-account" in current_url:
progress_update(phase="注册", step="输入邮箱...")
log.step("等待邮箱输入框...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=15)
if not email_input:
log.error("无法找到邮箱输入框")
return False
human_delay() # 模拟人类思考时间
log.step("输入邮箱...")
type_slowly(page, 'css:input[type="email"]', email)
log.success("邮箱已输入")
# 点击继续
human_delay(0.5, 1.2)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤2: 输入密码 (在密码页面: log-in/password 或 create-account/password)
if "auth.openai.com/log-in/password" in current_url or "auth.openai.com/create-account/password" in current_url:
progress_update(phase="注册", step="输入密码...")
# 先检查是否有密码错误提示,如果有则使用一次性验证码登录
try:
error_text = page.ele('text:Incorrect email address or password', timeout=1)
if error_text and error_text.states.is_displayed:
log.warning("密码错误,尝试使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=3)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if otp_btn:
old_url = page.url
otp_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
# 检查密码框是否已有内容(避免重复输入)
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if not password_input:
log.error("无法找到密码输入框")
return False
# 检查是否已输入密码
try:
current_value = password_input.attr('value') or ''
if len(current_value) > 0:
log.info("密码已输入,点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
log.step("等待密码输入框...")
human_delay() # 模拟人类思考时间
log.step("输入密码...")
type_slowly(page, 'css:input[type="password"]', password)
log.success("密码已输入")
# 点击继续
human_delay(0.5, 1.2)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
# 等待页面变化,检测是否密码错误
time.sleep(2)
# 检查是否出现密码错误提示
try:
error_text = page.ele('text:Incorrect email address or password', timeout=1)
if error_text and error_text.states.is_displayed:
log.warning("密码错误,尝试使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=3)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if otp_btn:
otp_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤3: 验证码页面
if "auth.openai.com/email-verification" in current_url:
break # 跳出循环,进入验证码流程
# 步骤4: 姓名/年龄页面 (账号已存在)
if "auth.openai.com/about-you" in current_url:
break # 跳出循环,进入补充信息流程
# 处理错误
if check_and_handle_error(page):
time.sleep(0.5)
continue
# 短暂等待页面变化
time.sleep(0.5)
# === 根据 URL 快速判断页面状态 ===
current_url = page.url
# 如果是 chatgpt.com 首页,说明已注册成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
try:
if is_logged_in(page):
log.success("检测到已登录,账号已注册成功")
return True
except Exception:
pass
# 检测到姓名/年龄输入页面 (账号已存在,只需补充信息)
if "auth.openai.com/about-you" in current_url:
progress_update(phase="注册", step="补充个人信息...")
log_current_url(page, "个人信息页面")
log.info("检测到姓名输入页面,账号已存在,补充信息...")
# 等待页面加载
name_input = wait_for_element(page, 'css:input[name="name"]', timeout=5)
if not name_input:
name_input = wait_for_element(page, 'css:input[autocomplete="name"]', timeout=3)
# 输入姓名
random_name = get_random_name()
log.step(f"输入姓名: {random_name}")
type_slowly(page, 'css:input[name="name"], input[autocomplete="name"]', random_name)
# 输入生日 (使用智能适配函数)
birthday = get_random_birthday()
_input_birthday(page, birthday)
# 点击提交
log.step("点击最终提交...")
time.sleep(0.5)
submit_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if submit_btn:
submit_btn.click()
time.sleep(2)
log.success(f"注册完成: {email}")
return True
# 检测到验证码页面
needs_verification = "auth.openai.com/email-verification" in current_url
if needs_verification:
log_current_url(page, "邮箱验证码页面")
if not needs_verification:
# 检查验证码输入框是否存在
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=3)
if code_input:
needs_verification = True
log_current_url(page, "邮箱验证码页面")
# 只有在 chatgpt.com 页面且已登录才能判断为成功
if not needs_verification:
try:
if "chatgpt.com" in page.url and is_logged_in(page):
log.success("账号已注册成功")
return True
except Exception:
pass
log.error("注册流程异常,未到达预期页面")
return False
# 获取验证码
progress_update(phase="注册", step="等待验证码...")
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("无法获取验证码")
return False
# 验证码重试循环 (最多重试 3 次)
max_code_retries = 3
for code_attempt in range(max_code_retries):
# 输入验证码
progress_update(phase="注册", step="输入验证码...")
log.step(f"输入验证码: {verification_code}")
while check_and_handle_error(page):
time.sleep(1)
# 重新获取输入框 (可能页面已刷新)
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[placeholder*="代码"]', timeout=5)
if not code_input:
# 再次检查是否已登录
try:
if is_logged_in(page):
log.success("检测到已登录,跳过验证码输入")
return True
except Exception:
pass
log.error("无法找到验证码输入框")
return False
# 清空并输入验证码
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="code"], input[placeholder*="代码"]', verification_code, base_delay=0.08)
time.sleep(0.5)
# 点击继续
log.step("点击继续...")
for attempt in range(3):
try:
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=10)
if continue_btn:
continue_btn.click()
break
except Exception:
time.sleep(0.5)
time.sleep(2)
# 检查是否出现"代码不正确"错误
try:
error_text = page.ele('text:代码不正确', timeout=1)
if not error_text:
error_text = page.ele('text:incorrect', timeout=1)
if not error_text:
error_text = page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
# 点击"重新发送电子邮件"
resend_btn = page.ele('text:重新发送电子邮件', timeout=3)
if not resend_btn:
resend_btn = page.ele('text:Resend email', timeout=2)
if not resend_btn:
resend_btn = page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
# 重新获取验证码
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if verification_code:
continue # 继续下一次尝试
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return False
else:
# 没有错误,验证码正确,跳出循环
break
except Exception:
# 没有检测到错误,继续
break
while check_and_handle_error(page):
time.sleep(0.5)
# 记录当前页面 (应该是 about-you 个人信息页面)
log_current_url(page, "验证码通过后-个人信息页面")
# 输入姓名 (随机外国名字)
random_name = get_random_name()
log.step(f"输入姓名: {random_name}")
name_input = wait_for_element(page, 'css:input[name="name"]', timeout=15)
if not name_input:
name_input = wait_for_element(page, 'css:input[autocomplete="name"]', timeout=5)
type_slowly(page, 'css:input[name="name"], input[autocomplete="name"]', random_name)
# 输入生日 (使用智能适配函数)
birthday = get_random_birthday()
_input_birthday(page, birthday)
# 最终提交
log.step("点击最终提交...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=10)
if continue_btn:
continue_btn.click()
# 等待并检查是否出现 "email not supported" 错误
time.sleep(2)
try:
error_text = page.ele('text:The email you provided is not supported', timeout=2)
if error_text and error_text.states.is_displayed:
log.error("邮箱域名不被支持,需要加入黑名单")
return "domain_blacklisted"
except Exception:
pass
log.success(f"注册完成: {email}")
time.sleep(1)
return True
except Exception as e:
log.error(f"注册失败: {e}")
return False
def perform_codex_authorization(page, email: str, password: str) -> dict:
"""执行 Codex 授权流程
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
dict: codex_data 或 None
"""
log.info(f"开始 Codex 授权: {email}", icon="code")
# 生成授权 URL
auth_url, session_id = crs_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取授权 URL")
return None
# 打开授权页面
log.step("打开授权页面...")
log.info(f"[URL] 授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
# 再次检测错误页面
check_and_handle_error_page(page)
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
# 可能是错误页面,再检测一次
if check_and_handle_error_page(page):
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if not email_input:
email_input = wait_for_element(page, '#email', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "输入邮箱后点击继续")
except Exception as e:
log.warning(f"邮箱输入步骤异常: {e}")
log_current_url(page, "邮箱步骤完成后")
# 检测错误页面
if check_and_handle_error_page(page):
# 错误重试后,检查当前页面状态
current_url = page.url
# 如果回到了登录页面,需要重新输入邮箱
if "auth.openai.com/log-in" in current_url and "/password" not in current_url:
log.info("重试后回到登录页,重新输入邮箱...")
try:
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"]', email, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
except Exception as e:
log.warning(f"重新输入邮箱异常: {e}")
# 再次检查当前 URL确定下一步
current_url = page.url
# 只有在密码页面才输入密码
if "/password" in current_url or "log-in/password" in current_url or "create-account/password" in current_url:
try:
# 输入密码
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if not password_input:
# 可能是错误页面
if check_and_handle_error_page(page):
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if not password_input:
password_input = wait_for_element(page, 'css:input[name="password"]', timeout=5)
if password_input:
type_slowly(page, 'css:input[type="password"], input[name="password"]', password, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "输入密码后点击继续")
except Exception as e:
log.warning(f"密码输入步骤异常: {e}")
else:
# 不在密码页面,可能需要先输入邮箱
log.info(f"当前不在密码页面: {current_url}")
try:
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=3)
if email_input:
log.step("输入邮箱...")
type_slowly(page, 'css:input[type="email"]', email, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
# 现在应该在密码页面了
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
log.step("输入密码...")
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
except Exception as e:
log.warning(f"登录流程异常: {e}")
log_current_url(page, "密码步骤完成后")
# 等待授权回调
max_wait = 45 # 减少等待时间
start_time = time.time()
code = None
progress_shown = False
last_url_in_loop = None
log.step(f"等待授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录URL变化
if current_url != last_url_in_loop:
log_current_url(page, "等待回调中")
last_url_in_loop = current_url
# 检查是否到达回调页面
if "localhost:1455/auth/callback" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
log.success("获取到回调 URL")
log.info(f"[URL] 回调地址: {current_url}", icon="browser")
code = extract_code_from_url(current_url)
if code:
log.success("提取授权码成功")
break
# 尝试点击授权按钮
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
time.sleep(1.5) # 减少等待
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5) # 减少轮询间隔
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"检查异常: {e}")
time.sleep(1.5)
if not code:
if progress_shown:
log.progress_clear()
log.warning("授权超时")
try:
current_url = page.url
if "code=" in current_url:
code = extract_code_from_url(current_url)
except Exception:
pass
if not code:
log.error("无法获取授权码")
return None
# 交换 tokens
log.step("交换 tokens...")
codex_data = crs_exchange_code(code, session_id)
if codex_data:
log.success("Codex 授权成功")
return codex_data
else:
log.error("Token 交换失败")
return None
def perform_codex_authorization_with_otp(page, email: str) -> dict:
"""执行 Codex 授权流程 (使用一次性验证码登录,适用于已注册的 Team Owner)
Args:
page: 浏览器页面实例
email: 邮箱地址
Returns:
dict: codex_data 或 None
"""
log.info("开始 Codex 授权 (OTP 登录)...", icon="auth")
# 生成授权 URL
auth_url, session_id = crs_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取授权 URL")
return None
# 打开授权页面
log.step("打开授权页面...")
log.info(f"[URL] 授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "OTP授权页面加载完成", force=True)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "OTP流程-输入邮箱后")
except Exception as e:
log.warning(f"邮箱输入步骤异常: {e}")
log_current_url(page, "OTP流程-邮箱步骤完成后")
try:
# 检查是否在密码页面,如果是则点击"使用一次性验证码登录"
current_url = page.url
if "/log-in/password" in current_url or "/password" in current_url:
log.step("检测到密码页面,点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=5)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if not otp_btn:
# 尝试通过按钮文本查找
buttons = page.eles('css:button')
for btn in buttons:
btn_text = btn.text.lower()
if '一次性验证码' in btn_text or 'one-time' in btn_text:
otp_btn = btn
break
if otp_btn:
old_url = page.url
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "点击OTP按钮后")
else:
log.warning("未找到一次性验证码登录按钮")
else:
# 不在密码页面,尝试直接找 OTP 按钮
log.step("点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'css:button[value="passwordless_login_send_otp"]', timeout=10)
if not otp_btn:
otp_btn = wait_for_element(page, 'css:button._inlinePasswordlessLogin', timeout=5)
if not otp_btn:
buttons = page.eles('css:button')
for btn in buttons:
if '一次性验证码' in btn.text or 'one-time' in btn.text.lower():
otp_btn = btn
break
if otp_btn:
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
time.sleep(2)
else:
log.warning("未找到一次性验证码登录按钮,尝试继续...")
except Exception as e:
log.warning(f"点击 OTP 按钮异常: {e}")
log_current_url(page, "OTP流程-准备获取验证码")
# 等待并获取验证码
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
# 手动输入
verification_code = input("⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("未输入验证码")
return None
# 验证码重试循环 (最多重试 3 次)
max_code_retries = 3
for code_attempt in range(max_code_retries):
try:
# 输入验证码
log.step(f"输入验证码: {verification_code}")
code_input = wait_for_element(page, 'css:input[name="otp"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[type="text"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=5)
if code_input:
# 清空并输入验证码
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="otp"], input[type="text"], input[autocomplete="one-time-code"]', verification_code, base_delay=0.08)
log.success("验证码已输入")
else:
log.error("未找到验证码输入框")
return None
# 点击继续/验证按钮
log.step("点击继续...")
time.sleep(1)
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
time.sleep(2)
# 检查是否出现"代码不正确"错误
try:
error_text = page.ele('text:代码不正确', timeout=1)
if not error_text:
error_text = page.ele('text:incorrect', timeout=1)
if not error_text:
error_text = page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
# 点击"重新发送电子邮件"
resend_btn = page.ele('text:重新发送电子邮件', timeout=3)
if not resend_btn:
resend_btn = page.ele('text:Resend email', timeout=2)
if not resend_btn:
resend_btn = page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
# 重新获取验证码
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if verification_code:
continue # 继续下一次尝试
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return None
else:
# 没有错误,验证码正确,跳出循环
break
except Exception:
# 没有检测到错误元素,说明验证码正确,继续
break
except Exception as e:
log.warning(f"验证码输入步骤异常: {e}")
break
# 等待授权回调
max_wait = 45
start_time = time.time()
code = None
progress_shown = False
last_url_in_loop = None
log.step(f"等待授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录URL变化
if current_url != last_url_in_loop:
log_current_url(page, "OTP流程-等待回调中")
last_url_in_loop = current_url
# 检查是否到达回调页面
if "localhost:1455/auth/callback" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
log.success("获取到回调 URL")
log.info(f"[URL] 回调地址: {current_url}", icon="browser")
code = extract_code_from_url(current_url)
if code:
log.success("提取授权码成功")
break
# 尝试点击授权按钮
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
time.sleep(1.5)
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"检查异常: {e}")
time.sleep(1.5)
if not code:
if progress_shown:
log.progress_clear()
log.warning("授权超时")
try:
current_url = page.url
if "code=" in current_url:
code = extract_code_from_url(current_url)
except Exception:
pass
if not code:
log.error("无法获取授权码")
return None
# 交换 tokens
log.step("交换 tokens...")
codex_data = crs_exchange_code(code, session_id)
if codex_data:
log.success("Codex 授权成功 (OTP)")
return codex_data
else:
log.error("Token 交换失败")
return None
def login_and_authorize_with_otp(email: str) -> tuple[bool, dict]:
"""Team Owner 专用: 使用一次性验证码登录并完成 Codex 授权
Args:
email: 邮箱地址
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- CPA 模式: codex_data 为 None (后台自动处理)
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 使用 OTP 登录
success = perform_cpa_authorization_with_otp(ctx.page, email)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA OTP 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式: 使用 OTP 登录
codex_data = perform_codex_authorization_with_otp(ctx.page, email)
if codex_data:
return True, codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None
return False, None
def register_only(email: str, password: str, use_api_register: bool = True) -> str:
"""仅注册 OpenAI 账号,不进行授权 (用于并行注册 + 串行授权模式)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
Returns:
str: 注册结果
- "success": 注册成功
- "domain_blacklisted": 域名被列入黑名单
- "retry_new_email": API 失败,需要重新生成邮箱重试
- "failed": 注册失败
"""
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, email, password,
use_api=use_api_register
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted"
# 检查是否需要重新生成邮箱
if register_result == "retry_new_email":
ctx.stop()
return "retry_new_email"
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return "failed"
# 注册成功
log.success(f"注册成功: {email}")
return "success"
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return "failed"
return "failed"
def register_and_authorize(email: str, password: str, use_api_register: bool = True,
team_name: str = None) -> tuple:
"""完整流程: 注册 OpenAI + Codex 授权 (带重试机制)
Args:
email: 邮箱地址
password: 密码
use_api_register: 是否优先使用 API 模式注册 (默认 True)
team_name: Team 名称 (用于验证码超时时邀请新邮箱)
Returns:
tuple: (register_success, codex_data, new_email_info)
- register_success: True/False/"domain_blacklisted"/"retry_new_email"
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
- new_email_info: 如果使用了新邮箱,返回 {"email": "xxx", "password": "xxx"},否则为 None
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
# 用于跟踪是否使用了新邮箱
new_email_info = None
current_email = email
current_password = password
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# 注册 OpenAI (优先使用 API 模式)
register_result = register_openai_account_auto(
ctx.page, current_email, current_password,
use_api=use_api_register,
team_name=team_name
)
# 检查是否是域名黑名单错误
if register_result == "domain_blacklisted":
ctx.stop()
return "domain_blacklisted", None, None
# 检查是否需要重新生成邮箱
if register_result == "retry_new_email":
ctx.stop()
return "retry_new_email", None, None
# 检查是否使用了新邮箱
if isinstance(register_result, str) and register_result.startswith("new_email:"):
# 解析新邮箱信息: "new_email:xxx@xxx.com:password"
parts = register_result.split(":", 2)
if len(parts) >= 3:
current_email = parts[1]
current_password = parts[2]
new_email_info = {"email": current_email, "password": current_password}
log.success(f"使用新邮箱继续: {current_email}")
register_result = True
if not register_result:
if attempt < ctx.max_retries - 1:
log.warning("注册失败,准备重试...")
continue
return False, None, new_email_info
# 短暂等待确保注册完成
time.sleep(0.5)
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
# 使用 keepalive 获取锁,防止等待期间浏览器连接断开
lock_acquired = False
if auth_lock:
log.step("等待授权回调锁...")
lock_acquired = acquire_lock_with_keepalive(auth_lock, ctx.page, timeout=180, check_interval=2.0)
if lock_acquired:
log.step("获取授权回调锁,开始授权...")
else:
log.error("获取授权回调锁超时或浏览器连接断开")
continue # 重试
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
# CPA 模式: 授权成功即完成,后台自动处理账号
success = perform_cpa_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
elif AUTH_PROVIDER == "s2a":
# S2A 模式: 授权成功即完成,后台自动处理账号
success = perform_s2a_authorization(ctx.page, current_email, current_password)
return True, None, new_email_info if success else (True, None, new_email_info)
else:
# CRS 模式: 需要 codex_data
codex_data = perform_codex_authorization(ctx.page, current_email, current_password)
return True, codex_data, new_email_info
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None, new_email_info
return False, None, new_email_info
def authorize_only(email: str, password: str) -> tuple[bool, dict]:
"""仅执行 Codex 授权 (适用于已注册但未授权的账号)
Args:
email: 邮箱地址
password: 密码
Returns:
tuple: (success, codex_data)
- CRS 模式: codex_data 包含 tokens
- CPA/S2A 模式: codex_data 为 None (后台自动处理)
"""
# 获取授权回调锁 (CPA/S2A 模式需要串行授权)
auth_lock = None
if AUTH_PROVIDER in ("cpa", "s2a"):
try:
import run
auth_lock = run._auth_callback_lock
except (ImportError, AttributeError):
pass
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
# ========== 授权流程 - CPA/S2A 需要串行执行 (避免回调端口冲突) ==========
# 使用 keepalive 获取锁,防止等待期间浏览器连接断开
lock_acquired = False
if auth_lock:
log.step("等待授权回调锁...")
lock_acquired = acquire_lock_with_keepalive(auth_lock, ctx.page, timeout=180, check_interval=2.0)
if lock_acquired:
log.step("获取授权回调锁,开始授权...")
else:
log.error("获取授权回调锁超时或浏览器连接断开")
continue # 重试
try:
# 根据配置选择授权方式
if AUTH_PROVIDER == "cpa":
log.info("已注册账号,使用 CPA 进行 Codex 授权...", icon="auth")
success = perform_cpa_authorization(ctx.page, email, password)
if success:
return True, None # CPA 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("CPA 授权失败,准备重试...")
continue
return False, None
elif AUTH_PROVIDER == "s2a":
log.info("已注册账号,使用 S2A 进行 Codex 授权...", icon="auth")
success = perform_s2a_authorization(ctx.page, email, password)
if success:
return True, None # S2A 模式不返回 codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("S2A 授权失败,准备重试...")
continue
return False, None
else:
# CRS 模式
log.info("已注册账号,直接进行 Codex 授权...", icon="auth")
codex_data = perform_codex_authorization(ctx.page, email, password)
if codex_data:
return True, codex_data
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return False, None
finally:
if auth_lock and lock_acquired:
log.step("释放授权回调锁")
auth_lock.release()
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return False, None
return False, None
# ==================== CPA 授权函数 ====================
def perform_cpa_authorization(page, email: str, password: str) -> bool:
"""执行 CPA 授权流程 (密码登录)
与 CRS 的关键差异:
- CRS 使用 session_idCPA 使用 state
- CRS 直接交换 code 得到 tokensCPA 提交整个回调 URL 然后轮询状态
- CPA 授权成功后不需要手动添加账号,后台自动处理
Args:
page: 浏览器实例
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
log.info(f"开始 CPA 授权: {email}", icon="code")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入邮箱后点击继续")
except Exception as e:
log.warning(f"CPA 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-输入密码后点击继续")
except Exception as e:
log.warning(f"CPA 密码输入步骤异常: {e}")
log_current_url(page, "CPA-密码步骤完成后")
# 等待授权回调
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录 URL 变化
if current_url != last_url_in_loop:
log_current_url(page, "CPA等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
# 检查是否到达回调页面 (CPA 使用 localhost:1455)
if is_cpa_callback_url(current_url):
if progress_shown:
log.progress_clear()
log.success("CPA 获取到回调 URL")
log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
callback_url = current_url
break
# 尝试点击授权按钮 (只点击一次)
if not auth_btn_clicked:
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(2) # 等待页面响应
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[CPA等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"CPA检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("CPA 无法获取回调 URL")
return False
# CPA 特有流程: 提交回调 URL
log.step("提交 CPA 回调 URL...")
if not cpa_submit_callback(callback_url):
log.error("CPA 回调 URL 提交失败")
return False
# CPA 特有流程: 轮询授权状态
if cpa_poll_auth_status(state):
log.success("CPA Codex 授权成功")
return True
else:
log.error("CPA 授权状态检查失败")
return False
except Exception as e:
log.error(f"CPA 授权异常: {e}")
return False
def perform_cpa_authorization_with_otp(page, email: str) -> bool:
"""执行 CPA 授权流程 (使用一次性验证码登录)
Args:
page: 浏览器页面实例
email: 邮箱地址
Returns:
bool: 授权是否成功
"""
log.info("开始 CPA 授权 (OTP 登录)...", icon="auth")
# 生成授权 URL
auth_url, state = cpa_generate_auth_url()
if not auth_url or not state:
log.error("无法获取 CPA 授权 URL")
return False
# 打开授权页面
log.step("打开 CPA 授权页面...")
log.info(f"[URL] CPA授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "CPA-OTP授权页面加载完成", force=True)
try:
# 输入邮箱
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
type_slowly(page, 'css:input[type="email"], input[name="email"], #email', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-OTP流程-输入邮箱后")
except Exception as e:
log.warning(f"CPA OTP 邮箱输入步骤异常: {e}")
log_current_url(page, "CPA-OTP流程-邮箱步骤完成后")
try:
# 检查是否在密码页面,如果是则点击"使用一次性验证码登录"
current_url = page.url
if "/log-in/password" in current_url or "/password" in current_url:
log.step("检测到密码页面,点击使用一次性验证码登录...")
otp_btn = wait_for_element(page, 'text=使用一次性验证码登录', timeout=5)
if not otp_btn:
otp_btn = wait_for_element(page, 'text=Log in with a one-time code', timeout=3)
if not otp_btn:
buttons = page.eles('css:button')
for btn in buttons:
btn_text = btn.text.lower()
if '一次性验证码' in btn_text or 'one-time' in btn_text:
otp_btn = btn
break
if otp_btn:
old_url = page.url
otp_btn.click()
log.success("已点击一次性验证码登录按钮")
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "CPA-点击OTP按钮后")
else:
log.warning("未找到一次性验证码登录按钮")
except Exception as e:
log.warning(f"CPA 点击 OTP 按钮异常: {e}")
log_current_url(page, "CPA-OTP流程-准备获取验证码")
# 等待并获取验证码
log.step("等待验证码邮件...")
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
verification_code = input(" 请手动输入验证码: ").strip()
if not verification_code:
log.error("未输入验证码")
return False
# 验证码重试循环
max_code_retries = 3
for code_attempt in range(max_code_retries):
try:
log.step(f"输入验证码: {verification_code}")
code_input = wait_for_element(page, 'css:input[name="otp"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[type="text"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=5)
if code_input:
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="otp"], input[type="text"], input[autocomplete="one-time-code"]', verification_code, base_delay=0.08)
log.success("验证码已输入")
else:
log.error("未找到验证码输入框")
return False
log.step("点击继续...")
time.sleep(1)
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
continue_btn.click()
time.sleep(2)
# 检查验证码错误
try:
error_text = page.ele('text:代码不正确', timeout=1) or \
page.ele('text:incorrect', timeout=1) or \
page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
resend_btn = page.ele('text:重新发送电子邮件', timeout=3) or \
page.ele('text:Resend email', timeout=2) or \
page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" 请手动输入验证码: ").strip()
if verification_code:
continue
log.warning("无法重新发送验证码")
else:
log.error("验证码多次错误,放弃")
return False
else:
break
except Exception:
break
except Exception as e:
log.warning(f"CPA OTP 验证码输入步骤异常: {e}")
break
# 等待授权回调
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
log.step(f"等待 CPA 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
if current_url != last_url_in_loop:
log_current_url(page, "CPA-OTP流程-等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
if is_cpa_callback_url(current_url):
if progress_shown:
log.progress_clear()
log.success("CPA 获取到回调 URL")
log.info(f"[URL] CPA回调地址: {current_url}", icon="browser")
callback_url = current_url
break
# 尝试点击授权按钮 (只点击一次)
if not auth_btn_clicked:
try:
buttons = page.eles('css:button[type="submit"]')
for btn in buttons:
if btn.states.is_displayed and btn.states.is_enabled:
btn_text = btn.text.lower()
if any(x in btn_text for x in ['allow', 'authorize', 'continue', '授权', '允许', '继续', 'accept']):
if progress_shown:
log.progress_clear()
progress_shown = False
log.step(f"点击按钮: {btn.text}")
btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(2) # 等待页面响应
break
except Exception:
pass
elapsed = int(time.time() - start_time)
log.progress_inline(f"[CPA-OTP等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"CPA OTP 检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("CPA OTP 无法获取回调 URL")
return False
# CPA 特有流程: 提交回调 URL
log.step("提交 CPA 回调 URL...")
if not cpa_submit_callback(callback_url):
log.error("CPA 回调 URL 提交失败")
return False
# CPA 特有流程: 轮询授权状态
if cpa_poll_auth_status(state):
log.success("CPA Codex 授权成功 (OTP)")
return True
else:
log.error("CPA 授权状态检查失败")
return False
# ==================== S2A 授权函数 ====================
def perform_s2a_authorization(page, email: str, password: str) -> bool:
"""执行 S2A 授权流程 (密码登录)
Args:
page: 浏览器实例 (API 模式下可为 None)
email: 邮箱地址
password: 密码
Returns:
bool: 授权是否成功
"""
# 检查是否使用 API 模式
if S2A_API_MODE:
log.info(f"使用 S2A API 模式授权: {email}", icon="code")
success, result = s2a_api_authorize(email, password)
return success
# 以下是浏览器模式
log.info(f"开始 S2A 浏览器授权: {email}", icon="code")
progress_update(phase="授权", step="开始 S2A 授权...")
# 注意: 授权回调锁已在上层函数 (register_and_authorize/authorize_only) 中获取
# 这里不再重复获取锁
try:
# 生成授权 URL
auth_url, session_id = s2a_generate_auth_url()
if not auth_url or not session_id:
log.error("无法获取 S2A 授权 URL")
return False
# 打开授权页面
progress_update(phase="授权", step="打开授权页面...")
log.step("打开 S2A 授权页面...")
log.info(f"[URL] S2A授权URL: {auth_url}", icon="browser")
page.get(auth_url)
wait_for_page_stable(page, timeout=5)
log_current_url(page, "S2A授权页面加载完成", force=True)
# 检测错误页面
check_and_handle_error_page(page)
try:
# 输入邮箱
progress_update(phase="授权", step="输入邮箱...")
log.step("输入邮箱...")
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=10)
if not email_input:
email_input = wait_for_element(page, 'css:input[name="email"]', timeout=5)
if email_input:
type_slowly(page, 'css:input[type="email"], input[name="email"]', email, base_delay=0.06)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入邮箱后点击继续")
except Exception as e:
log.warning(f"S2A 邮箱输入步骤异常: {e}")
log_current_url(page, "S2A-邮箱步骤完成后")
# 输入密码
current_url = page.url
if "/password" in current_url:
try:
progress_update(phase="授权", step="输入密码...")
log.step("输入密码...")
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=10)
if password_input:
type_slowly(page, 'css:input[type="password"]', password, base_delay=0.06)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=8)
log_url_change(page, old_url, "S2A-输入密码后点击继续")
except Exception as e:
log.warning(f"S2A 密码输入步骤异常: {e}")
log_current_url(page, "S2A-密码步骤完成后")
# 检查是否需要邮箱验证码
time.sleep(1)
current_url = page.url
if "/email-verification" in current_url:
log.step("检测到邮箱验证码页面,开始获取验证码...")
progress_update(phase="授权", step="等待验证码...")
# 获取验证码 (使用较短的轮询参数,加快响应)
verification_code, error, email_time = unified_get_verification_code(
email, max_retries=10, interval=2
)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("S2A 无法获取邮箱验证码")
return False
# 验证码重试循环 (最多重试 3 次)
max_code_retries = 3
for code_attempt in range(max_code_retries):
log.step(f"输入验证码: {verification_code}")
# 查找验证码输入框
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=10)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[inputmode="numeric"]', timeout=5)
if not code_input:
log.error("S2A 无法找到验证码输入框")
return False
# 清空并输入验证码
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"]', verification_code, base_delay=0.08)
time.sleep(0.5)
# 点击继续
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=10)
if continue_btn:
old_url = page.url
continue_btn.click()
time.sleep(2)
# 检查是否出现验证码错误
try:
error_text = page.ele('text:代码不正确', timeout=1)
if not error_text:
error_text = page.ele('text:incorrect', timeout=1)
if not error_text:
error_text = page.ele('text:Invalid code', timeout=1)
if error_text and error_text.states.is_displayed:
if code_attempt < max_code_retries - 1:
log.warning(f"S2A 验证码错误,尝试重新获取 ({code_attempt + 1}/{max_code_retries})...")
# 点击重新发送
resend_btn = page.ele('text:重新发送电子邮件', timeout=3)
if not resend_btn:
resend_btn = page.ele('text:Resend email', timeout=2)
if not resend_btn:
resend_btn = page.ele('text:resend', timeout=2)
if resend_btn:
resend_btn.click()
log.info("已点击重新发送,等待新验证码...")
time.sleep(3)
verification_code, error, email_time = unified_get_verification_code(email)
if not verification_code:
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if verification_code:
continue
log.warning("S2A 无法重新发送验证码")
else:
log.error("S2A 验证码多次错误,放弃")
return False
else:
# 没有错误,验证码正确
break
except Exception:
# 没有检测到错误元素,说明验证码正确
break
# 等待 URL 变化
wait_for_url_change(page, old_url, timeout=5)
log_url_change(page, old_url, "S2A-输入验证码后")
log_current_url(page, "S2A-验证码步骤完成后")
# 检查是否在授权确认页面,如果是则点击授权按钮
time.sleep(1)
current_url = page.url
if "/authorize" in current_url or "consent" in current_url:
try:
log.step("检测到授权确认页面,点击授权按钮...")
auth_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if auth_btn:
btn_text = auth_btn.text.lower() if auth_btn.text else ""
if 'authorize' in btn_text or '授权' in btn_text or 'allow' in btn_text or 'continue' in btn_text:
old_url = page.url
auth_btn.click()
time.sleep(2)
if page.url != old_url:
log_url_change(page, old_url, "S2A-点击授权按钮后")
except Exception as e:
log.warning(f"S2A 授权按钮点击异常: {e}")
# 等待授权回调 (S2A 使用 localhost 回调)
max_wait = 45
start_time = time.time()
callback_url = None
progress_shown = False
last_url_in_loop = None
auth_btn_clicked = False # 标记是否已点击授权按钮
error_detected = False # 标记是否检测到错误
progress_update(phase="授权", step="等待回调...")
log.step(f"等待 S2A 授权回调 (最多 {max_wait}s)...")
while time.time() - start_time < max_wait:
try:
current_url = page.url
# 记录 URL 变化
if current_url != last_url_in_loop:
log_current_url(page, "S2A等待回调中")
last_url_in_loop = current_url
# URL 变化后重置授权按钮点击标记
auth_btn_clicked = False
error_detected = False
# 检查是否到达回调页面 (S2A 使用 localhost:1455 或类似端口)
if "localhost" in current_url and "code=" in current_url:
if progress_shown:
log.progress_clear()
progress_shown = False
callback_url = current_url
log.success(f"捕获 S2A 回调 URL")
break
# 检测 OpenAI 错误页面
if not error_detected:
try:
error_elem = page.ele('text:Something went wrong', timeout=0.3) or \
page.ele('text:出错了', timeout=0.3) or \
page.ele('text:Access denied', timeout=0.3) or \
page.ele('text:rate limit', timeout=0.3)
if error_elem:
log.warning(f"检测到错误页面: {error_elem.text[:50] if error_elem.text else 'unknown'}")
error_detected = True
# 尝试刷新页面
page.refresh()
time.sleep(3)
continue
except Exception:
pass
# 检测错误
check_and_handle_error(page)
# 检查是否在邮箱验证码页面 (有几率出现)
if "/email-verification" in current_url:
if progress_shown:
log.progress_clear()
progress_shown = False
log.step("回调等待中检测到邮箱验证码页面...")
progress_update(phase="授权", step="等待验证码...")
verification_code, error, email_time = unified_get_verification_code(
email, max_retries=10, interval=2
)
if not verification_code:
log.warning(f"自动获取验证码失败: {error}")
verification_code = input(" ⚠️ 请手动输入验证码: ").strip()
if not verification_code:
log.error("S2A 无法获取邮箱验证码")
return False
# 查找验证码输入框
code_input = wait_for_element(page, 'css:input[name="code"]', timeout=5)
if not code_input:
code_input = wait_for_element(page, 'css:input[autocomplete="one-time-code"]', timeout=3)
if not code_input:
code_input = wait_for_element(page, 'css:input[inputmode="numeric"]', timeout=3)
if code_input:
try:
code_input.clear()
except Exception:
pass
type_slowly(page, 'css:input[name="code"], input[autocomplete="one-time-code"], input[inputmode="numeric"]', verification_code, base_delay=0.08)
time.sleep(0.5)
submit_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if submit_btn:
old_url = page.url
submit_btn.click()
log.step("已提交验证码,等待页面跳转...")
wait_for_url_change(page, old_url, timeout=10)
log_url_change(page, old_url, "S2A-回调循环中输入验证码后")
else:
log.warning("S2A 回调循环中未找到验证码输入框")
# 重置标记,让循环重新检测新页面状态
auth_btn_clicked = False
error_detected = False
continue
# 检查是否需要点击 Authorize (只点击一次)
if not auth_btn_clicked:
try:
auth_btn = page.ele('css:button[type="submit"]', timeout=0.5)
if auth_btn:
btn_text = auth_btn.text.lower() if auth_btn.text else ""
btn_enabled = auth_btn.states.is_enabled if hasattr(auth_btn, 'states') else True
btn_displayed = auth_btn.states.is_displayed if hasattr(auth_btn, 'states') else True
if 'authorize' in btn_text or '授权' in btn_text or 'continue' in btn_text or 'allow' in btn_text:
if btn_enabled and btn_displayed:
log.step(f"点击授权按钮: '{auth_btn.text}'")
old_url = page.url
auth_btn.click()
auth_btn_clicked = True # 标记已点击
time.sleep(3) # 增加等待时间
new_url = page.url
if new_url != old_url:
log_url_change(page, old_url, "S2A点击授权按钮后")
else:
log.warning(f"点击授权按钮后 URL 未变化,当前: {new_url[:80]}...")
else:
log.warning(f"授权按钮不可用: enabled={btn_enabled}, displayed={btn_displayed}")
except Exception as e:
log.warning(f"检查授权按钮异常: {e}")
elapsed = int(time.time() - start_time)
log.progress_inline(f"[S2A等待中... {elapsed}s]")
progress_shown = True
time.sleep(1.5)
except Exception as e:
if progress_shown:
log.progress_clear()
progress_shown = False
log.warning(f"S2A检查异常: {e}")
time.sleep(1.5)
if progress_shown:
log.progress_clear()
if not callback_url:
log.error("S2A 无法获取回调链接")
return False
# 从回调 URL 中提取 code
code = extract_code_from_url(callback_url)
if not code:
log.error("S2A 无法从回调链接提取授权码")
return False
# S2A 特有流程: 用授权码创建账号 (传入完整邮箱用于验证)
progress_update(phase="授权", step="提交授权码...")
log.step("正在提交 S2A 授权码...")
result = s2a_create_account_from_oauth(code, session_id, name=email)
if result:
log.success("S2A 授权流程完成")
return True
else:
log.error("S2A 账号入库失败")
return False
except Exception as e:
log.error(f"S2A 授权异常: {e}")
return False
# ==================== 格式3专用: 登录获取 Session ====================
def login_and_get_session(page, email: str, password: str) -> dict:
"""登录 ChatGPT 并获取 accessToken 和 account_id (格式3专用)
用于 team.json 格式3 (只有邮箱和密码,没有 token) 的 Team Owner
登录后从 /api/auth/session 获取 token 和 account_id
Args:
page: 浏览器页面实例
email: 邮箱
password: 密码
Returns:
dict: {"token": "...", "account_id": "..."} 或 None
"""
log.info(f"登录获取 Session: {email}", icon="account")
try:
# 打开 ChatGPT 登录页
url = "https://chatgpt.com"
log.step(f"打开 {url}")
page.get(url)
wait_for_page_stable(page, timeout=8)
log_current_url(page, "登录页面加载完成", force=True)
# 检查是否已登录
if is_logged_in(page):
log.info("已登录,直接获取 Session...")
return _fetch_session_data(page)
# 点击登录按钮
log.step("点击登录...")
login_btn = wait_for_element(page, 'css:[data-testid="login-button"]', timeout=5)
if not login_btn:
login_btn = wait_for_element(page, 'text:登录', timeout=3)
if not login_btn:
login_btn = wait_for_element(page, 'text:Log in', timeout=3)
if login_btn:
old_url = page.url
login_btn.click()
# 等待页面变化
for _ in range(6):
time.sleep(0.5)
if page.url != old_url:
log_url_change(page, old_url, "点击登录按钮")
break
# 检测弹窗中的邮箱输入框
try:
email_input = page.ele('css:input[type="email"], input[name="email"]', timeout=1)
if email_input and email_input.states.is_displayed:
break
except Exception:
pass
current_url = page.url
log_current_url(page, "登录按钮点击后")
# 登录流程循环
max_steps = 10
for step in range(max_steps):
current_url = page.url
log_current_url(page, f"登录流程步骤 {step + 1}")
# 检查是否已登录成功
if "chatgpt.com" in current_url and "auth.openai.com" not in current_url:
if is_logged_in(page):
log.success("登录成功")
# 检查并选择工作空间
_check_and_select_workspace(page)
time.sleep(1)
return _fetch_session_data(page)
# 步骤1: 输入邮箱
if "auth.openai.com/log-in-or-create-account" in current_url or \
("chatgpt.com" in current_url and "auth.openai.com" not in current_url):
email_input = wait_for_element(page, 'css:input[type="email"]', timeout=5)
if email_input:
log.step("输入邮箱...")
human_delay()
type_slowly(page, 'css:input[type="email"]', email)
log.success("邮箱已输入")
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 步骤2: 输入密码
if "/password" in current_url:
password_input = wait_for_element(page, 'css:input[type="password"]', timeout=5)
if password_input:
# 检查是否已输入密码
try:
current_value = password_input.attr('value') or ''
if len(current_value) > 0:
log.info("密码已输入,点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
except Exception:
pass
log.step("输入密码...")
human_delay()
type_slowly(page, 'css:input[type="password"]', password)
log.success("密码已输入")
human_delay(0.5, 1.0)
log.step("点击继续...")
continue_btn = wait_for_element(page, 'css:button[type="submit"]', timeout=5)
if continue_btn:
old_url = page.url
continue_btn.click()
wait_for_url_change(page, old_url, timeout=10)
continue
# 处理错误
if check_and_handle_error(page):
time.sleep(0.5)
continue
# 检查是否出现工作空间选择页面
if _check_and_select_workspace(page):
# 选择工作空间后继续
time.sleep(1)
continue
time.sleep(0.5)
# 最终检查是否登录成功
if is_logged_in(page):
# 再次检查工作空间选择
_check_and_select_workspace(page)
time.sleep(1)
log.success("登录成功")
return _fetch_session_data(page)
log.error("登录流程未完成")
return None
except Exception as e:
log.error(f"登录失败: {e}")
return None
def _check_and_select_workspace(page) -> bool:
"""检查并选择工作空间
如果出现"启动工作空间"页面,点击第一个"打开"按钮
Returns:
bool: 是否处理了工作空间选择
"""
try:
# 检查是否有"启动工作空间"文字
workspace_text = page.ele('text:启动工作空间', timeout=2)
if not workspace_text:
workspace_text = page.ele('text:Launch workspace', timeout=1)
if not workspace_text:
return False
log.info("检测到工作空间选择页面")
# 直接点击第一个"打开"按钮
open_btn = page.ele('text:打开', timeout=2)
if not open_btn:
open_btn = page.ele('text:Open', timeout=1)
if open_btn:
log.step("选择第一个工作空间...")
open_btn.click()
# 等待页面加载完成
wait_for_page_stable(page, timeout=10)
# 检查是否进入了职业选择页面(说明工作空间选择成功)
if _is_job_selection_page(page):
log.success("已进入工作空间")
return True
log.warning("未找到打开按钮")
return False
except Exception as e:
log.warning(f"检查工作空间异常: {e}")
return False
def _is_job_selection_page(page) -> bool:
"""检查是否在职业选择页面
出现"你从事哪种工作?"说明工作空间选择成功
Returns:
bool: 是否在职业选择页面
"""
try:
job_text = page.ele('text:你从事哪种工作', timeout=2)
if not job_text:
job_text = page.ele('text:What kind of work do you do', timeout=1)
return bool(job_text)
except Exception:
return False
def _fetch_session_data(page) -> dict:
"""访问 session API 页面获取 token 和 account_id
Args:
page: 浏览器页面实例
Returns:
dict: {"token": "...", "account_id": "..."} 或 None
"""
try:
import json as json_module
# 直接访问 session API 页面
log.step("获取 Session 数据...")
page.get("https://chatgpt.com/api/auth/session")
time.sleep(1)
# 获取页面内容JSON
body = page.ele('tag:body', timeout=5)
if not body:
log.error("无法获取页面内容")
return None
text = body.text
if not text or text == '{}':
log.error("Session 数据为空")
return None
data = json_module.loads(text)
token = data.get('accessToken')
user = data.get('user', {})
account = data.get('account', {})
account_id = account.get('id') if account else None
if token:
log.success(f"获取 Session 成功: {user.get('email', 'unknown')}")
if account_id:
log.info(f" account_id: {account_id[:20]}...")
else:
log.warning(" account_id: 未获取到")
return {
"token": token,
"account_id": account_id or ""
}
else:
log.error("Session 中没有 token")
return None
except Exception as e:
log.error(f"获取 Session 失败: {e}")
return None
def login_and_authorize_team_owner(email: str, password: str, proxy: dict = None) -> dict:
"""格式3专用: 登录获取 token/account_id 并同时进行授权
Args:
email: 邮箱
password: 密码
proxy: 代理配置 (可选)
Returns:
dict: {
"success": True/False, # 授权是否成功
"token": "...",
"account_id": "...",
"authorized": True/False # 是否已授权
}
"""
from config import format_proxy_url
with browser_context_with_retry(max_browser_retries=2) as ctx:
for attempt in ctx.attempts():
try:
page = ctx.page
if proxy:
proxy_url = format_proxy_url(proxy)
if proxy_url:
log.info(f"使用代理: {proxy.get('host')}:{proxy.get('port')}")
# 步骤1: 登录获取 Session
session_data = login_and_get_session(page, email, password)
if not session_data:
if attempt < ctx.max_retries - 1:
log.warning("登录失败,准备重试...")
continue
return {"success": False}
token = session_data["token"]
account_id = session_data["account_id"]
# 步骤2: 进行授权
if AUTH_PROVIDER == "cpa":
success = perform_cpa_authorization(page, email, password)
return {
"success": success,
"token": token,
"account_id": account_id,
"authorized": success
}
else:
codex_data = perform_codex_authorization(page, email, password)
if codex_data:
from crs_service import crs_add_account
crs_result = crs_add_account(email, codex_data)
return {
"success": bool(crs_result),
"token": token,
"account_id": account_id,
"authorized": bool(crs_result),
"crs_id": crs_result.get("id") if crs_result else None
}
else:
if attempt < ctx.max_retries - 1:
log.warning("授权失败,准备重试...")
continue
return {
"success": False,
"token": token,
"account_id": account_id,
"authorized": False
}
except Exception as e:
ctx.handle_error(e)
if ctx.current_attempt >= ctx.max_retries - 1:
return {"success": False}
return {"success": False}