feat(bot_notifier, browser_automation): Add debug screenshot capability and improve browser automation

- Add `_send_photo_to_all()` async method to send photos to all admin chat IDs
- Add `send_screenshot()` async method for sending debug screenshots via Telegram
- Add `send_screenshot_sync()` synchronous wrapper for non-async code integration
- Add `save_debug_screenshot()` function to capture and send screenshots when DEBUG_SCREENSHOT=true
- Add debug screenshot directory creation and timestamp-based file naming
- Improve browser initialization with automation detection bypass and realistic User-Agent
- Add `--disable-blink-features=AutomationControlled` flag to hide automation characteristics
- Set Mozilla/5.0 User-Agent string to mimic real Chrome browser
- Enhance OpenAI account registration flow with better error handling and debugging
- Add screenshot capture on page load failures and popup abnormalities
- Improve popup detection with additional CSS and text selectors for better reliability
- Increase timeout values for popup loading and form detection (1-3s to 2-5s)
- Add multiple fallback selectors for login form detection (email input fields, welcome text)
- Improve signup button retry logic with longer wait times and additional selector options
- Add screenshot capture on critical failures (popup retry failed, signup button not found)
This commit is contained in:
2026-01-16 00:07:40 +08:00
parent 99599134c0
commit 7e8c3784c1
2 changed files with 91 additions and 10 deletions

View File

@@ -200,6 +200,26 @@ class BotNotifier:
except TelegramError: except TelegramError:
pass pass
async def _send_photo_to_all(self, photo_path: str, caption: str = ""):
"""发送图片到所有管理员"""
for chat_id in self.chat_ids:
try:
with open(photo_path, 'rb') as photo:
await self.bot.send_photo(
chat_id=chat_id,
photo=photo,
caption=caption,
parse_mode="HTML"
)
except TelegramError:
pass
except FileNotFoundError:
pass
async def send_screenshot(self, photo_path: str, caption: str = ""):
"""发送调试截图"""
await self._send_photo_to_all(photo_path, caption)
def queue_message(self, message: str, level: str = "info"): def queue_message(self, message: str, level: str = "info"):
"""将消息加入发送队列 (非阻塞)""" """将消息加入发送队列 (非阻塞)"""
if self._message_queue: if self._message_queue:
@@ -292,6 +312,16 @@ def notify_sync(message: str, level: str = "info"):
_notifier.queue_message(message, level) _notifier.queue_message(message, level)
def send_screenshot_sync(photo_path: str, caption: str = ""):
"""同步方式发送截图 (供非异步代码使用)"""
if _notifier and _notifier._loop:
import asyncio
asyncio.run_coroutine_threadsafe(
_notifier.send_screenshot(photo_path, caption),
_notifier._loop
)
# ==================== 进度更新接口 (供 run.py 使用) ==================== # ==================== 进度更新接口 (供 run.py 使用) ====================
def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]: def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]:

View File

@@ -44,6 +44,38 @@ ACTION_DELAY = (1.0, 2.0) if SAFE_MODE else (0.3, 0.8) # 操作间隔范围
# ==================== URL 监听与日志 ==================== # ==================== URL 监听与日志 ====================
_last_logged_url = None # 记录上次日志的URL避免重复 _last_logged_url = None # 记录上次日志的URL避免重复
# ==================== 调试截图 ====================
DEBUG_SCREENSHOT = os.environ.get('DEBUG_SCREENSHOT', '').lower() == 'true'
SCREENSHOT_DIR = os.path.join(os.path.dirname(__file__), 'debug_screenshots')
def save_debug_screenshot(page, name: str):
"""保存调试截图并发送到 Telegram (仅在 DEBUG_SCREENSHOT=true 时生效)
Args:
page: 浏览器页面对象
name: 截图名称 (不含扩展名)
"""
if not DEBUG_SCREENSHOT:
return
try:
if not os.path.exists(SCREENSHOT_DIR):
os.makedirs(SCREENSHOT_DIR)
timestamp = time.strftime('%Y%m%d_%H%M%S')
filepath = os.path.join(SCREENSHOT_DIR, f'{timestamp}_{name}.png')
page.get_screenshot(filepath)
log.info(f"[DEBUG] 截图已保存: {filepath}")
# 发送截图到 Telegram Bot
try:
from bot_notifier import send_screenshot_sync
caption = f"🔍 <b>Debug Screenshot</b>\n<code>{name}</code>"
send_screenshot_sync(filepath, caption)
except Exception:
pass # Bot 未启动时忽略
except Exception as e:
log.warning(f"截图保存失败: {e}")
def log_current_url(page, context: str = None, force: bool = False): def log_current_url(page, context: str = None, force: bool = False):
"""记录当前页面URL (完整地址) """记录当前页面URL (完整地址)
@@ -211,6 +243,10 @@ def init_browser(max_retries: int = BROWSER_MAX_RETRIES) -> ChromiumPage:
co.set_argument('--disable-gpu') # 减少资源占用 co.set_argument('--disable-gpu') # 减少资源占用
co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题 co.set_argument('--disable-dev-shm-usage') # 避免共享内存问题
co.set_argument('--no-sandbox') # 服务器环境需要 co.set_argument('--no-sandbox') # 服务器环境需要
co.set_argument('--disable-blink-features=AutomationControlled') # 隐藏自动化特征
# 设置 User-Agent (模拟真实浏览器)
co.set_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36')
# Linux 服务器特殊配置 # Linux 服务器特殊配置
if is_linux: if is_linux:
@@ -690,6 +726,7 @@ def register_openai_account(page, email: str, password: str) -> bool:
page.ele('text:登录', timeout=1) # 也可能显示登录按钮 page.ele('text:登录', timeout=1) # 也可能显示登录按钮
if not page_ok: if not page_ok:
log.warning("页面加载异常3秒后刷新...") log.warning("页面加载异常3秒后刷新...")
save_debug_screenshot(page, 'page_load_abnormal')
time.sleep(3) time.sleep(3)
page.refresh() page.refresh()
wait_for_page_stable(page, timeout=8) wait_for_page_stable(page, timeout=8)
@@ -734,14 +771,23 @@ def register_openai_account(page, email: str, password: str) -> bool:
if "auth.openai.com" not in current_url and "chatgpt.com" in current_url: if "auth.openai.com" not in current_url and "chatgpt.com" in current_url:
log.step("尝试在当前弹窗中输入邮箱...") log.step("尝试在当前弹窗中输入邮箱...")
# 快速检查弹窗是否正常加载(包含登录表单) # 等待弹窗完全加载 (增加等待时间)
login_form = wait_for_element(page, 'css:[data-testid="login-form"]', timeout=1) time.sleep(2)
# 快速检查弹窗是否正常加载(包含登录表单)- 增加更多选择器
login_form = wait_for_element(page, 'css:[data-testid="login-form"]', timeout=3)
if not login_form: if not login_form:
login_form = page.ele('text:登录或注册', timeout=1) or page.ele('text:Log in or sign up', timeout=1) login_form = page.ele('text:登录或注册', timeout=2) or \
page.ele('text:Log in or sign up', timeout=2) or \
page.ele('text:Welcome back', timeout=1) or \
page.ele('text:欢迎回来', timeout=1) or \
page.ele('css:input[type="email"]', timeout=2) or \
page.ele('css:input[name="email"]', timeout=1)
if not login_form: if not login_form:
# 弹窗内容异常,关闭并刷新页面重试 # 弹窗内容异常,关闭并刷新页面重试
log.warning("弹窗内容异常,刷新页面重试...") log.warning("弹窗内容异常,刷新页面重试...")
save_debug_screenshot(page, 'popup_abnormal')
close_btn = page.ele('css:button[aria-label="Close"], button[aria-label="关闭"]', timeout=1) close_btn = page.ele('css:button[aria-label="Close"], button[aria-label="关闭"]', timeout=1)
if not close_btn: if not close_btn:
close_btn = page.ele('css:button:has(svg)', timeout=1) close_btn = page.ele('css:button:has(svg)', timeout=1)
@@ -756,19 +802,24 @@ def register_openai_account(page, email: str, password: str) -> bool:
# 重新点击注册按钮 # 重新点击注册按钮
log.step("重新点击免费注册...") log.step("重新点击免费注册...")
signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=5) or \ signup_btn = wait_for_element(page, 'css:[data-testid="signup-button"]', timeout=8) or \
wait_for_element(page, 'text:免费注册', timeout=3) wait_for_element(page, 'text:免费注册', timeout=5) or \
wait_for_element(page, 'text:Sign up', timeout=3) or \
wait_for_element(page, 'text:Get started', timeout=2)
if signup_btn: if signup_btn:
signup_btn.click() signup_btn.click()
time.sleep(2) time.sleep(3) # 增加等待时间
# 再次检查弹窗 # 再次检查弹窗
login_form = page.ele('css:[data-testid="login-form"]', timeout=3) or \ login_form = page.ele('css:[data-testid="login-form"]', timeout=5) or \
page.ele('text:登录或注册', timeout=2) page.ele('text:登录或注册', timeout=3) or \
page.ele('css:input[type="email"]', timeout=3)
if not login_form: if not login_form:
log.error("重试后弹窗仍然异常,跳过此账号") log.error("重试后弹窗仍然异常,跳过此账号")
save_debug_screenshot(page, 'popup_retry_failed')
return False return False
else: else:
log.error("找不到注册按钮,跳过此账号") log.error("找不到注册按钮,跳过此账号")
save_debug_screenshot(page, 'signup_button_not_found')
return False return False
# 尝试输入邮箱 # 尝试输入邮箱