This commit is contained in:
2026-01-18 05:38:14 +08:00
parent e510c568b4
commit 7e92e12c79
5 changed files with 438 additions and 132 deletions

View File

@@ -41,16 +41,21 @@ def make_progress_bar(current: int, total: int, width: int = 10) -> str:
class ProgressTracker:
"""进度跟踪器 - 用于实时更新 Telegram 消息显示进度"""
def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int):
def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int,
team_index: int = 0, teams_total: int = 0, include_owner: bool = False):
self.bot = bot
self.chat_ids = chat_ids
self.team_name = team_name
self.total = total
self.team_index = team_index # 当前 Team 序号 (从 1 开始)
self.teams_total = teams_total # Team 总数
self.include_owner = include_owner # 是否包含 Owner
self.current = 0
self.success = 0
self.failed = 0
self.current_account = ""
self.current_step = ""
self.current_role = "" # 当前账号角色 (member/owner)
self.messages: Dict[int, Message] = {} # chat_id -> Message
self._last_update = 0
self._update_interval = 2 # 最小更新间隔 (秒)
@@ -60,17 +65,24 @@ class ProgressTracker:
"""生成进度消息文本"""
bar = make_progress_bar(self.current, self.total, 12)
# 标题行:显示 Team 序号
if self.teams_total > 0:
title = f"<b>📦 Team [{self.team_index}/{self.teams_total}]: {self.team_name}</b>"
else:
title = f"<b>📦 正在处理: {self.team_name}</b>"
lines = [
f"<b>正在处理: {self.team_name}</b>",
title,
"",
f"进度: {bar}",
f"账号: {self.current}/{self.total}",
f"账号: {self.current}/{self.total}" + (f" (含 Owner)" if self.include_owner else ""),
f"成功: {self.success} | 失败: {self.failed}",
]
if self.current_account:
lines.append("")
lines.append(f"当前: <code>{self.current_account}</code>")
role_tag = " 👑" if self.current_role == "owner" else ""
lines.append(f"当前: <code>{self.current_account}</code>{role_tag}")
if self.current_step:
lines.append(f"步骤: {self.current_step}")
@@ -105,10 +117,14 @@ class ProgressTracker:
except TelegramError:
pass
def _schedule_update(self):
"""调度消息更新 (限流)"""
def _schedule_update(self, force: bool = False):
"""调度消息更新 (限流)
Args:
force: 是否强制更新 (忽略限流)
"""
now = time.time()
if now - self._last_update < self._update_interval:
if not force and now - self._last_update < self._update_interval:
return
self._last_update = now
@@ -121,7 +137,7 @@ class ProgressTracker:
self._loop = loop
asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop)
def update(self, current: int = None, account: str = None, step: str = None):
def update(self, current: int = None, account: str = None, step: str = None, role: str = None):
"""更新进度 (供同步代码调用)"""
if current is not None:
self.current = current
@@ -129,6 +145,8 @@ class ProgressTracker:
self.current_account = account
if step is not None:
self.current_step = step
if role is not None:
self.current_role = role
self._schedule_update()
@@ -141,13 +159,16 @@ class ProgressTracker:
self.failed += 1
self.current_account = ""
self.current_step = ""
self._schedule_update()
self.current_role = ""
# 最后一个账号完成时强制更新,确保显示 100%
is_last = self.current >= self.total
self._schedule_update(force=is_last)
def finish(self):
"""完成进度跟踪,发送最终状态"""
self.current_step = "已完成!"
if self._loop:
asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop)
# 强制更新最终状态
self._schedule_update(force=True)
class BotNotifier:
@@ -197,8 +218,8 @@ class BotNotifier:
text=message,
parse_mode=parse_mode
)
except TelegramError:
pass
except TelegramError as e:
print(f"[BotNotifier] 发送消息失败 (chat_id={chat_id}): {e}")
async def _send_photo_to_all(self, photo_path: str, caption: str = ""):
"""发送图片到所有管理员"""
@@ -232,10 +253,20 @@ class BotNotifier:
"""直接发送通知 (阻塞)"""
await self._send_to_all(message)
def create_progress(self, team_name: str, total: int) -> ProgressTracker:
"""创建进度跟踪器"""
def create_progress(self, team_name: str, total: int, team_index: int = 0,
teams_total: int = 0, include_owner: bool = False) -> ProgressTracker:
"""创建进度跟踪器
Args:
team_name: Team 名称
total: 账号总数
team_index: 当前 Team 序号 (从 1 开始)
teams_total: Team 总数
include_owner: 是否包含 Owner
"""
self._current_progress = ProgressTracker(
self.bot, self.chat_ids, team_name, total
self.bot, self.chat_ids, team_name, total,
team_index=team_index, teams_total=teams_total, include_owner=include_owner
)
if self._loop:
self._current_progress.start(self._loop)
@@ -286,6 +317,56 @@ class BotNotifier:
await self.notify(message)
async def notify_team_completed(self, team_name: str, results: list):
"""通知单个 Team 处理完成
Args:
team_name: Team 名称
results: 处理结果列表 [{"email", "status", ...}]
"""
if not results:
return
success_accounts = [r.get("email") for r in results if r.get("status") == "success"]
failed_accounts = [r.get("email") for r in results if r.get("status") != "success"]
success_count = len(success_accounts)
failed_count = len(failed_accounts)
total = len(results)
# 状态图标
if failed_count == 0:
icon = ""
status = "全部成功"
elif success_count == 0:
icon = ""
status = "全部失败"
else:
icon = "⚠️"
status = f"{failed_count} 个失败"
# 构建消息
message = (
f"<b>{icon} Team 处理完成</b>\n"
f"Team: <code>{team_name}</code>\n"
f"结果: {success_count}/{total} 成功\n"
f"状态: {status}"
)
# 列出成功的账号
if success_accounts:
message += "\n\n<b>✓ 成功:</b>"
for email in success_accounts:
message += f"\n <code>{email}</code>"
# 列出失败的账号
if failed_accounts:
message += "\n\n<b>✗ 失败:</b>"
for email in failed_accounts:
message += f"\n <code>{email}</code>"
await self.notify(message)
async def notify_error(self, message: str, details: str = ""):
"""通知错误"""
if not TELEGRAM_NOTIFY_ON_ERROR:
@@ -343,17 +424,26 @@ def send_screenshot_sync(photo_path: str, caption: str = ""):
# ==================== 进度更新接口 (供 run.py 使用) ====================
def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]:
"""开始进度跟踪"""
def progress_start(team_name: str, total: int, team_index: int = 0,
teams_total: int = 0, include_owner: bool = False) -> Optional[ProgressTracker]:
"""开始进度跟踪
Args:
team_name: Team 名称
total: 账号总数
team_index: 当前 Team 序号 (从 1 开始)
teams_total: Team 总数
include_owner: 是否包含 Owner
"""
if _notifier:
return _notifier.create_progress(team_name, total)
return _notifier.create_progress(team_name, total, team_index, teams_total, include_owner)
return None
def progress_update(account: str = None, step: str = None):
def progress_update(account: str = None, step: str = None, role: str = None):
"""更新当前进度"""
if _notifier and _notifier.get_progress():
_notifier.get_progress().update(account=account, step=step)
_notifier.get_progress().update(account=account, step=step, role=role)
def progress_account_done(email: str, success: bool):
@@ -366,3 +456,13 @@ def progress_finish():
"""完成进度跟踪"""
if _notifier:
_notifier.clear_progress()
def notify_team_completed_sync(team_name: str, results: list):
"""同步方式通知单个 Team 完成 (供 run.py 使用)"""
if _notifier and _notifier._loop:
import asyncio
asyncio.run_coroutine_threadsafe(
_notifier.notify_team_completed(team_name, results),
_notifier._loop
)

View File

@@ -35,6 +35,27 @@ from s2a_service import (
from logger import log
# ==================== 停止检查 ====================
class ShutdownRequested(Exception):
"""停止请求异常 - 用于中断浏览器操作"""
pass
def check_shutdown():
"""检查是否收到停止请求,如果是则抛出异常"""
try:
import run
if run._shutdown_requested:
log.warning("检测到停止请求,中断当前操作...")
raise ShutdownRequested("用户请求停止")
except ImportError:
pass
except ShutdownRequested:
raise
except Exception:
pass
# ==================== 浏览器配置常量 ====================
BROWSER_MAX_RETRIES = 3 # 浏览器启动最大重试次数
BROWSER_RETRY_DELAY = 2 # 重试间隔 (秒)
@@ -198,24 +219,49 @@ def cleanup_chrome_processes():
"""清理残留的 Chrome 进程 (跨平台支持)"""
try:
if platform.system() == "Windows":
# Windows: 使用 tasklist 和 taskkill
result = subprocess.run(
['tasklist', '/FI', 'IMAGENAME eq chrome.exe', '/FO', 'CSV'],
capture_output=True, text=True, timeout=5
)
if 'chrome.exe' in result.stdout:
# Windows: 使用 taskkill 清理 chromedriver 和 chrome
try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
log.step("已清理 chromedriver 残留进程")
except Exception:
pass
# 清理无头模式的 chrome 进程 (带 --headless 参数的)
try:
result = subprocess.run(
['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
capture_output=True, text=True, timeout=5
)
for line in result.stdout.strip().split('\n'):
pid = line.strip()
if pid.isdigit():
subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
else:
# Linux/Mac: 使用 pkill
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
try:
subprocess.run(
['pkill', '-f', 'chromedriver'],
capture_output=True, timeout=5
)
except Exception:
pass
# 清理无头模式的 chrome 进程
try:
subprocess.run(
['pkill', '-f', 'chrome.*--headless'],
capture_output=True, timeout=5
)
except Exception:
pass
log.step("已清理 Chrome 残留进程")
except Exception:
pass # 静默处理,不影响主流程
@@ -484,6 +530,9 @@ class BrowserRetryContext:
if not self._should_continue:
break
# 检查停止请求
check_shutdown()
self.current_attempt = attempt
# 非首次尝试时的清理和等待
@@ -495,8 +544,12 @@ class BrowserRetryContext:
# 初始化浏览器
try:
check_shutdown() # 再次检查
self.page = init_browser()
yield attempt
except ShutdownRequested:
self._should_continue = False
raise
except Exception as e:
log.error(f"浏览器初始化失败: {e}")
if attempt >= self.max_retries - 1:
@@ -504,6 +557,11 @@ class BrowserRetryContext:
def handle_error(self, error: Exception):
"""处理错误,决定是否继续重试"""
# 如果是停止请求,直接停止
if isinstance(error, ShutdownRequested):
self._should_continue = False
return
log.error(f"流程异常: {error}")
if self.current_attempt >= self.max_retries - 1:
self._should_continue = False
@@ -550,6 +608,9 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -
stable_count = 0
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
# 检查浏览器标签页是否还在加载favicon 旋转动画)
ready_state = page.run_js('return document.readyState', timeout=2)
@@ -567,6 +628,8 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -
stable_count = 0
last_html_len = current_len
time.sleep(check_interval)
except ShutdownRequested:
raise
except Exception:
time.sleep(check_interval)
@@ -626,11 +689,16 @@ def wait_for_element(page, selector: str, timeout: int = 10, visible: bool = Tru
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
element = page.ele(selector, timeout=1)
if element:
if not visible or (element.states.is_displayed if hasattr(element, 'states') else True):
return element
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.3)
@@ -653,11 +721,16 @@ def wait_for_url_change(page, old_url: str, timeout: int = 15, contains: str = N
start_time = time.time()
while time.time() - start_time < timeout:
# 检查停止请求
check_shutdown()
try:
current_url = page.url
if current_url != old_url:
if contains is None or contains in current_url:
return True
except ShutdownRequested:
raise
except Exception:
pass
time.sleep(0.5)

136
run.py
View File

@@ -26,7 +26,7 @@ from team_service import batch_invite_to_team, print_team_summary, check_availab
from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token
from cpa_service import cpa_verify_connection
from s2a_service import s2a_verify_connection
from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner
from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested
from utils import (
save_to_csv,
load_team_tracker,
@@ -44,13 +44,14 @@ from logger import log
# 进度更新 (Telegram Bot 使用,导入失败时忽略)
try:
from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish
from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish, notify_team_completed_sync
except ImportError:
# 如果没有 bot_notifier使用空函数
def progress_start(team_name, total): pass
def progress_update(account=None, step=None): pass
def progress_account_done(email, success): pass
def progress_finish(): pass
def notify_team_completed_sync(team_name, results): pass
# ==================== 全局状态 ====================
@@ -96,10 +97,13 @@ if threading.current_thread() is threading.main_thread():
def process_single_team(team: dict) -> tuple[list, list]:
def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -> tuple[list, list]:
"""处理单个 Team 的完整流程
Args:
team: Team 配置
team_index: 当前 Team 序号 (从 1 开始)
teams_total: Team 总数
Returns:
tuple: (处理结果列表, 待处理的 Owner 列表)
@@ -202,21 +206,42 @@ def process_single_team(team: dict) -> tuple[list, list]:
else:
log.warning(f"Team {team_name} 没有可用席位,无法邀请新成员")
# ========== 阶段 3: 处理普通成员 (注册 + Codex 授权 + CRS) ==========
if invited_accounts:
log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + CRS 入库")
member_results = process_accounts(invited_accounts, team_name)
results.extend(member_results)
# ========== 合并成员和 Owner 一起处理 ==========
all_to_process = invited_accounts.copy()
# Owner 不在这里处理,统一放到所有 Team 处理完后
# 添加未完成的 Owner 到处理列表
from config import INCLUDE_TEAM_OWNERS
include_owner = INCLUDE_TEAM_OWNERS and len(owner_accounts) > 0
if include_owner:
for owner in owner_accounts:
all_to_process.append({
"email": owner["email"],
"password": owner.get("password", DEFAULT_PASSWORD),
"status": owner.get("status", ""),
"role": "owner"
})
log.info(f"包含 {len(owner_accounts)} 个 Owner 账号一起处理")
# ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ==========
if all_to_process:
log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库")
all_results = process_accounts(
all_to_process, team_name,
team_index=team_index, teams_total=teams_total,
include_owner=include_owner
)
results.extend(all_results)
# ========== Team 处理完成 ==========
success_count = sum(1 for r in results if r["status"] == "success")
if results:
log.success(f"{team_name} 成员处理完成: {success_count}/{len(results)} 成功")
log.success(f"{team_name} 处理完成: {success_count}/{len(results)} 成功")
# 发送 Team 完成报告到 Telegram
notify_team_completed_sync(team_name, results)
# 返回未完成的 Owner 列表供后续统一处理
return results, owner_accounts
# 返回空列表,因为 Owner 已经在这里处理
return results, []
def _get_team_by_name(team_name: str) -> dict:
@@ -227,12 +252,16 @@ def _get_team_by_name(team_name: str) -> dict:
return {}
def process_accounts(accounts: list, team_name: str) -> list:
def process_accounts(accounts: list, team_name: str, team_index: int = 0,
teams_total: int = 0, include_owner: bool = False) -> list:
"""处理账号列表 (注册/授权/CRS)
Args:
accounts: 账号列表 [{"email", "password", "status", "role"}]
team_name: Team 名称
team_index: 当前 Team 序号 (从 1 开始)
teams_total: Team 总数
include_owner: 是否包含 Owner
Returns:
list: 处理结果
@@ -242,7 +271,7 @@ def process_accounts(accounts: list, team_name: str) -> list:
results = []
# 启动进度跟踪 (Telegram Bot)
progress_start(team_name, len(accounts))
progress_start(team_name, len(accounts), team_index, teams_total, include_owner)
for i, account in enumerate(accounts):
if _shutdown_requested:
@@ -291,7 +320,7 @@ def process_accounts(accounts: list, team_name: str) -> list:
log.separator("#", 50)
# 更新进度: 当前账号
progress_update(account=email, step="Starting...")
progress_update(account=email, step="Starting...", role=role)
result = {
"team": team_name,
@@ -331,10 +360,11 @@ def process_accounts(accounts: list, team_name: str) -> list:
update_account_status(_tracker, team_name, email, "processing")
save_team_tracker(_tracker)
with Timer(f"账号 {email}"):
if is_team_owner_otp:
# 旧格式 Team Owner: 使用 OTP 登录授权
log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth")
try:
with Timer(f"账号 {email}"):
if is_team_owner_otp:
# 旧格式 Team Owner: 使用 OTP 登录授权
log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth")
progress_update(step="OTP Login...")
auth_success, codex_data = login_and_authorize_with_otp(email)
register_success = auth_success
@@ -469,6 +499,13 @@ def process_accounts(accounts: list, team_name: str) -> list:
update_account_status(_tracker, team_name, email, "register_failed")
save_team_tracker(_tracker)
except ShutdownRequested:
# 用户请求停止,保存当前状态并退出
log.warning(f"用户请求停止,当前账号: {email}")
# 不改变账号状态,保持中断前的状态,下次继续处理
save_team_tracker(_tracker)
break
# 保存到 CSV
save_to_csv(
email=email,
@@ -514,10 +551,10 @@ def run_all_teams():
log.warning(f"发现 {total_incomplete} 个未完成账号,将优先处理")
_current_results = []
all_pending_owners = [] # 收集所有待处理的 Owner
teams_total = len(TEAMS)
with Timer("全部流程"):
# ========== 第一阶段: 处理所有 Team 的普通成员 ==========
# ========== 处理所有 Team (成员 + Owner 一起) ==========
for i, team in enumerate(TEAMS):
if _shutdown_requested:
log.warning("检测到中断请求,停止处理...")
@@ -525,51 +562,18 @@ def run_all_teams():
log.separator("", 60)
team_email = team.get('account') or team.get('owner_email', '')
log.highlight(f"Team {i + 1}/{len(TEAMS)}: {team['name']} ({team_email})", icon="team")
log.highlight(f"Team {i + 1}/{teams_total}: {team['name']} ({team_email})", icon="team")
log.separator("", 60)
results, pending_owners = process_single_team(team)
# 收集待处理的 Owner
if pending_owners:
for owner in pending_owners:
all_pending_owners.append({
"team_name": team["name"],
"email": owner["email"],
"password": owner.get("password", DEFAULT_PASSWORD),
"status": owner.get("status", "team_owner"),
"role": "owner"
})
# 传递 Team 序号信息
results, _ = process_single_team(team, team_index=i + 1, teams_total=teams_total)
_current_results.extend(results)
# Team 之间的间隔
if i < len(TEAMS) - 1 and not _shutdown_requested:
if i < teams_total - 1 and not _shutdown_requested:
wait_time = 3
log.countdown(wait_time, "下一个 Team")
# ========== 第二阶段: 统一处理所有 Team Owner 的 CRS 授权 ==========
if all_pending_owners and not _shutdown_requested:
log.separator("", 60)
log.header(f"统一处理 Team Owner CRS 授权 ({len(all_pending_owners)} 个)")
log.separator("", 60)
for i, owner in enumerate(all_pending_owners):
if _shutdown_requested:
log.warning("检测到中断请求,停止处理...")
break
log.separator("#", 50)
log.info(f"Owner {i + 1}/{len(all_pending_owners)}: {owner['email']} ({owner['team_name']})", icon="account")
log.separator("#", 50)
owner_results = process_accounts([owner], owner["team_name"])
_current_results.extend(owner_results)
# Owner 之间的间隔
if i < len(all_pending_owners) - 1 and not _shutdown_requested:
wait_time = random.randint(5, 15)
log.info(f"等待 {wait_time}s 后处理下一个 Owner...", icon="wait")
time.sleep(wait_time)
# 打印总结
print_summary(_current_results)
@@ -592,22 +596,10 @@ def run_single_team(team_index: int = 0):
log.info(f"单 Team 模式: {team['name']}", icon="start")
_current_results = []
results, pending_owners = process_single_team(team)
# 单 Team 模式:序号为 1/1
results, _ = process_single_team(team, team_index=1, teams_total=1)
_current_results.extend(results)
# 单 Team 模式下也处理 Owner
if pending_owners:
log.section(f"处理 Team Owner ({len(pending_owners)} 个)")
for owner in pending_owners:
owner_data = {
"email": owner["email"],
"password": owner.get("password", DEFAULT_PASSWORD),
"status": owner.get("status", "team_owner"),
"role": "owner"
}
owner_results = process_accounts([owner_data], team["name"])
_current_results.extend(owner_results)
print_summary(_current_results)
return _current_results

View File

@@ -113,7 +113,7 @@ def preload_all_account_ids() -> tuple[int, int]:
Returns:
tuple: (success_count, fail_count)
"""
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
import sys
success_count = 0
fail_count = 0
@@ -127,42 +127,71 @@ def preload_all_account_ids() -> tuple[int, int]:
log.success(f"所有 Team account_id 已缓存 ({len(teams_with_token)} 个)")
return len(teams_with_token), 0
total = len(teams_with_token)
log.info(f"预加载 {len(teams_need_fetch)} 个 Team 的 account_id...", icon="sync")
need_save = False
failed_teams = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TextColumn("{task.fields[status]}"),
) as progress:
task = progress.add_task("加载中", total=len(teams_with_token), status="")
# 检测是否为 TTY 环境
is_tty = sys.stdout.isatty()
for team in teams_with_token:
progress.update(task, description=f"[cyan]{team['name']}", status="")
if is_tty:
# TTY 环境: 使用 rich 进度条
from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TaskProgressColumn(),
TextColumn("{task.fields[status]}"),
) as progress:
task = progress.add_task("加载中", total=total, status="")
for team in teams_with_token:
progress.update(task, description=f"[cyan]{team['name']}", status="")
if team.get("account_id"):
success_count += 1
progress.update(task, advance=1, status="[green]✓ 已缓存")
continue
account_id = fetch_account_id(team, silent=True)
if account_id:
success_count += 1
progress.update(task, advance=1, status="[green]✓")
if team.get("format") == "new":
need_save = True
else:
fail_count += 1
failed_teams.append(team['name'])
progress.update(task, advance=1, status="[red]✗")
else:
# 非 TTY 环境 (systemd/journalctl): 使用普通日志输出
for idx, team in enumerate(teams_with_token, 1):
team_name = team['name']
if team.get("account_id"):
success_count += 1
progress.update(task, advance=1, status="[green]✓ 已缓存")
log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ 已缓存")
continue
account_id = fetch_account_id(team, silent=True)
if account_id:
success_count += 1
progress.update(task, advance=1, status="[green]✓")
log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ {account_id}")
if team.get("format") == "new":
need_save = True
else:
fail_count += 1
failed_teams.append(team['name'])
progress.update(task, advance=1, status="[red]✗")
failed_teams.append(team_name)
log.warning(f"预加载 [{idx}/{total}] {team_name}: ✗ 失败")
# 输出失败的 team
for name in failed_teams:
log.warning(f"Team {name}: 获取 account_id 失败")
# 输出失败的 team (仅 TTY 环境,非 TTY 已在循环中输出)
if is_tty:
for name in failed_teams:
log.warning(f"Team {name}: 获取 account_id 失败")
# 持久化到 team.json
if need_save:

View File

@@ -44,7 +44,7 @@ from config import (
S2A_GROUP_IDS,
S2A_ADMIN_KEY,
)
from utils import load_team_tracker
from utils import load_team_tracker, get_all_incomplete_accounts
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats
from email_service import gptmail_service, unified_create_email
@@ -98,6 +98,7 @@ class ProvisionerBot:
("fingerprint", self.cmd_fingerprint),
("run", self.cmd_run),
("run_all", self.cmd_run_all),
("resume", self.cmd_resume),
("stop", self.cmd_stop),
("logs", self.cmd_logs),
("logs_live", self.cmd_logs_live),
@@ -173,6 +174,7 @@ class ProvisionerBot:
BotCommand("config", "查看系统配置"),
BotCommand("run", "处理指定 Team"),
BotCommand("run_all", "处理所有 Team"),
BotCommand("resume", "继续处理未完成账号"),
BotCommand("stop", "停止当前任务"),
BotCommand("logs", "查看最近日志"),
BotCommand("dashboard", "查看 S2A 仪表盘"),
@@ -209,6 +211,7 @@ class ProvisionerBot:
<b>🚀 任务控制:</b>
/run &lt;n&gt; - 开始处理第 n 个 Team
/run_all - 开始处理所有 Team
/resume - 继续处理未完成账号
/stop - 停止当前任务
<b>⚙️ 配置管理:</b>
@@ -729,6 +732,53 @@ class ProvisionerBot:
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
@admin_only
async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
"""继续处理未完成的账号"""
if self.current_task and not self.current_task.done():
await update.message.reply_text(
f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
)
return
# 检查是否有未完成的账号
tracker = load_team_tracker()
all_incomplete = get_all_incomplete_accounts(tracker)
if not all_incomplete:
await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成")
return
# 统计未完成账号
total_incomplete = sum(len(accs) for accs in all_incomplete.values())
teams_count = len(all_incomplete)
# 构建消息
lines = [
f"<b>⏳ 发现 {total_incomplete} 个未完成账号</b>",
f"涉及 {teams_count} 个 Team:",
""
]
for team_name, accounts in all_incomplete.items():
lines.append(f" • <b>{team_name}</b>: {len(accounts)}")
lines.append("")
lines.append("🚀 开始继续处理...")
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
# 启动任务 (run_all_teams 会自动处理未完成的账号)
self.current_team = "继续处理"
loop = asyncio.get_event_loop()
self.current_task = loop.run_in_executor(
self.executor,
self._run_all_teams_task
)
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理"))
async def _wrap_task(self, task, team_name: str):
"""包装任务以处理完成通知"""
try:
@@ -736,8 +786,10 @@ class ProvisionerBot:
# 收集成功和失败的账号
success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"]
failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"]
log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}")
await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts)
except Exception as e:
log.error(f"任务异常: {team_name}, 错误: {e}")
await self.notifier.notify_error(f"任务失败: {team_name}", str(e))
finally:
self.current_team = None
@@ -748,11 +800,31 @@ class ProvisionerBot:
"""执行单个 Team 任务 (在线程池中运行)"""
# 延迟导入避免循环依赖
from run import run_single_team
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
# 预加载 account_id
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_single_team(team_idx)
def _run_all_teams_task(self):
"""执行所有 Team 任务 (在线程池中运行)"""
from run import run_all_teams
from team_service import preload_all_account_ids
from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
from config import DEFAULT_PASSWORD
# 预加载 account_id
preload_all_account_ids()
_tracker = load_team_tracker()
add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
save_team_tracker(_tracker)
return run_all_teams()
@admin_only
@@ -770,8 +842,10 @@ class ProvisionerBot:
try:
import run
run._shutdown_requested = True
# 获取当前运行结果
current_results = run._current_results.copy() if run._current_results else []
except Exception:
pass
current_results = []
# 2. 取消 asyncio 任务
if self.current_task and not self.current_task.done():
@@ -797,13 +871,51 @@ class ProvisionerBot:
# 清理进度跟踪
progress_finish()
await update.message.reply_text(
f"<b>✅ 任务已强制停止</b>\n\n"
f"已停止: {task_name}\n"
f"已清理浏览器进程\n\n"
f"使用 /status 查看状态",
parse_mode="HTML"
)
# 6. 生成停止报告
report_lines = [
f"<b>🛑 任务已停止</b>",
f"任务: {task_name}",
"",
]
# 本次运行结果
if current_results:
success_count = sum(1 for r in current_results if r.get("status") == "success")
failed_count = len(current_results) - success_count
report_lines.append(f"<b>📊 本次运行结果:</b>")
report_lines.append(f" 成功: {success_count}")
report_lines.append(f" 失败: {failed_count}")
report_lines.append("")
# 获取未完成账号信息
tracker = load_team_tracker()
all_incomplete = get_all_incomplete_accounts(tracker)
if all_incomplete:
total_incomplete = sum(len(accs) for accs in all_incomplete.values())
report_lines.append(f"<b>⏳ 待继续处理: {total_incomplete} 个账号</b>")
# 显示每个 Team 的未完成账号 (最多显示 3 个 Team)
shown_teams = 0
for team_name, accounts in all_incomplete.items():
if shown_teams >= 3:
remaining = len(all_incomplete) - 3
report_lines.append(f" ... 还有 {remaining} 个 Team")
break
report_lines.append(f" <b>{team_name}</b>: {len(accounts)}")
# 显示第一个待处理账号
if accounts:
first_acc = accounts[0]
report_lines.append(f" 下一个: <code>{first_acc['email']}</code>")
shown_teams += 1
report_lines.append("")
report_lines.append("💡 使用 /resume 继续处理")
else:
report_lines.append("✅ 没有待处理的账号")
await update.message.reply_text("\n".join(report_lines), parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")