diff --git a/bot_notifier.py b/bot_notifier.py
index ee03b27..7b3284d 100644
--- a/bot_notifier.py
+++ b/bot_notifier.py
@@ -41,16 +41,21 @@ def make_progress_bar(current: int, total: int, width: int = 10) -> str:
class ProgressTracker:
"""进度跟踪器 - 用于实时更新 Telegram 消息显示进度"""
- def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int):
+ def __init__(self, bot: Bot, chat_ids: List[int], team_name: str, total: int,
+ team_index: int = 0, teams_total: int = 0, include_owner: bool = False):
self.bot = bot
self.chat_ids = chat_ids
self.team_name = team_name
self.total = total
+ self.team_index = team_index # 当前 Team 序号 (从 1 开始)
+ self.teams_total = teams_total # Team 总数
+ self.include_owner = include_owner # 是否包含 Owner
self.current = 0
self.success = 0
self.failed = 0
self.current_account = ""
self.current_step = ""
+ self.current_role = "" # 当前账号角色 (member/owner)
self.messages: Dict[int, Message] = {} # chat_id -> Message
self._last_update = 0
self._update_interval = 2 # 最小更新间隔 (秒)
@@ -60,17 +65,24 @@ class ProgressTracker:
"""生成进度消息文本"""
bar = make_progress_bar(self.current, self.total, 12)
+ # 标题行:显示 Team 序号
+ if self.teams_total > 0:
+ title = f"📦 Team [{self.team_index}/{self.teams_total}]: {self.team_name}"
+ else:
+ title = f"📦 正在处理: {self.team_name}"
+
lines = [
- f"正在处理: {self.team_name}",
+ title,
"",
f"进度: {bar}",
- f"账号: {self.current}/{self.total}",
+ f"账号: {self.current}/{self.total}" + (f" (含 Owner)" if self.include_owner else ""),
f"成功: {self.success} | 失败: {self.failed}",
]
if self.current_account:
lines.append("")
- lines.append(f"当前: {self.current_account}")
+ role_tag = " 👑" if self.current_role == "owner" else ""
+ lines.append(f"当前: {self.current_account}{role_tag}")
if self.current_step:
lines.append(f"步骤: {self.current_step}")
@@ -105,10 +117,14 @@ class ProgressTracker:
except TelegramError:
pass
- def _schedule_update(self):
- """调度消息更新 (限流)"""
+ def _schedule_update(self, force: bool = False):
+ """调度消息更新 (限流)
+
+ Args:
+ force: 是否强制更新 (忽略限流)
+ """
now = time.time()
- if now - self._last_update < self._update_interval:
+ if not force and now - self._last_update < self._update_interval:
return
self._last_update = now
@@ -121,7 +137,7 @@ class ProgressTracker:
self._loop = loop
asyncio.run_coroutine_threadsafe(self._send_initial_message(), loop)
- def update(self, current: int = None, account: str = None, step: str = None):
+ def update(self, current: int = None, account: str = None, step: str = None, role: str = None):
"""更新进度 (供同步代码调用)"""
if current is not None:
self.current = current
@@ -129,6 +145,8 @@ class ProgressTracker:
self.current_account = account
if step is not None:
self.current_step = step
+ if role is not None:
+ self.current_role = role
self._schedule_update()
@@ -141,13 +159,16 @@ class ProgressTracker:
self.failed += 1
self.current_account = ""
self.current_step = ""
- self._schedule_update()
+ self.current_role = ""
+ # 最后一个账号完成时强制更新,确保显示 100%
+ is_last = self.current >= self.total
+ self._schedule_update(force=is_last)
def finish(self):
"""完成进度跟踪,发送最终状态"""
self.current_step = "已完成!"
- if self._loop:
- asyncio.run_coroutine_threadsafe(self._update_messages(), self._loop)
+ # 强制更新最终状态
+ self._schedule_update(force=True)
class BotNotifier:
@@ -197,8 +218,8 @@ class BotNotifier:
text=message,
parse_mode=parse_mode
)
- except TelegramError:
- pass
+ except TelegramError as e:
+ print(f"[BotNotifier] 发送消息失败 (chat_id={chat_id}): {e}")
async def _send_photo_to_all(self, photo_path: str, caption: str = ""):
"""发送图片到所有管理员"""
@@ -232,10 +253,20 @@ class BotNotifier:
"""直接发送通知 (阻塞)"""
await self._send_to_all(message)
- def create_progress(self, team_name: str, total: int) -> ProgressTracker:
- """创建进度跟踪器"""
+ def create_progress(self, team_name: str, total: int, team_index: int = 0,
+ teams_total: int = 0, include_owner: bool = False) -> ProgressTracker:
+ """创建进度跟踪器
+
+ Args:
+ team_name: Team 名称
+ total: 账号总数
+ team_index: 当前 Team 序号 (从 1 开始)
+ teams_total: Team 总数
+ include_owner: 是否包含 Owner
+ """
self._current_progress = ProgressTracker(
- self.bot, self.chat_ids, team_name, total
+ self.bot, self.chat_ids, team_name, total,
+ team_index=team_index, teams_total=teams_total, include_owner=include_owner
)
if self._loop:
self._current_progress.start(self._loop)
@@ -286,6 +317,56 @@ class BotNotifier:
await self.notify(message)
+ async def notify_team_completed(self, team_name: str, results: list):
+ """通知单个 Team 处理完成
+
+ Args:
+ team_name: Team 名称
+ results: 处理结果列表 [{"email", "status", ...}]
+ """
+ if not results:
+ return
+
+ success_accounts = [r.get("email") for r in results if r.get("status") == "success"]
+ failed_accounts = [r.get("email") for r in results if r.get("status") != "success"]
+
+ success_count = len(success_accounts)
+ failed_count = len(failed_accounts)
+ total = len(results)
+
+ # 状态图标
+ if failed_count == 0:
+ icon = "✅"
+ status = "全部成功"
+ elif success_count == 0:
+ icon = "❌"
+ status = "全部失败"
+ else:
+ icon = "⚠️"
+ status = f"{failed_count} 个失败"
+
+ # 构建消息
+ message = (
+ f"{icon} Team 处理完成\n"
+ f"Team: {team_name}\n"
+ f"结果: {success_count}/{total} 成功\n"
+ f"状态: {status}"
+ )
+
+ # 列出成功的账号
+ if success_accounts:
+ message += "\n\n✓ 成功:"
+ for email in success_accounts:
+ message += f"\n {email}"
+
+ # 列出失败的账号
+ if failed_accounts:
+ message += "\n\n✗ 失败:"
+ for email in failed_accounts:
+ message += f"\n {email}"
+
+ await self.notify(message)
+
async def notify_error(self, message: str, details: str = ""):
"""通知错误"""
if not TELEGRAM_NOTIFY_ON_ERROR:
@@ -343,17 +424,26 @@ def send_screenshot_sync(photo_path: str, caption: str = ""):
# ==================== 进度更新接口 (供 run.py 使用) ====================
-def progress_start(team_name: str, total: int) -> Optional[ProgressTracker]:
- """开始进度跟踪"""
+def progress_start(team_name: str, total: int, team_index: int = 0,
+ teams_total: int = 0, include_owner: bool = False) -> Optional[ProgressTracker]:
+ """开始进度跟踪
+
+ Args:
+ team_name: Team 名称
+ total: 账号总数
+ team_index: 当前 Team 序号 (从 1 开始)
+ teams_total: Team 总数
+ include_owner: 是否包含 Owner
+ """
if _notifier:
- return _notifier.create_progress(team_name, total)
+ return _notifier.create_progress(team_name, total, team_index, teams_total, include_owner)
return None
-def progress_update(account: str = None, step: str = None):
+def progress_update(account: str = None, step: str = None, role: str = None):
"""更新当前进度"""
if _notifier and _notifier.get_progress():
- _notifier.get_progress().update(account=account, step=step)
+ _notifier.get_progress().update(account=account, step=step, role=role)
def progress_account_done(email: str, success: bool):
@@ -366,3 +456,13 @@ def progress_finish():
"""完成进度跟踪"""
if _notifier:
_notifier.clear_progress()
+
+
+def notify_team_completed_sync(team_name: str, results: list):
+ """同步方式通知单个 Team 完成 (供 run.py 使用)"""
+ if _notifier and _notifier._loop:
+ import asyncio
+ asyncio.run_coroutine_threadsafe(
+ _notifier.notify_team_completed(team_name, results),
+ _notifier._loop
+ )
diff --git a/browser_automation.py b/browser_automation.py
index 0748054..7e5f229 100644
--- a/browser_automation.py
+++ b/browser_automation.py
@@ -35,6 +35,27 @@ from s2a_service import (
from logger import log
+# ==================== 停止检查 ====================
+class ShutdownRequested(Exception):
+ """停止请求异常 - 用于中断浏览器操作"""
+ pass
+
+
+def check_shutdown():
+ """检查是否收到停止请求,如果是则抛出异常"""
+ try:
+ import run
+ if run._shutdown_requested:
+ log.warning("检测到停止请求,中断当前操作...")
+ raise ShutdownRequested("用户请求停止")
+ except ImportError:
+ pass
+ except ShutdownRequested:
+ raise
+ except Exception:
+ pass
+
+
# ==================== 浏览器配置常量 ====================
BROWSER_MAX_RETRIES = 3 # 浏览器启动最大重试次数
BROWSER_RETRY_DELAY = 2 # 重试间隔 (秒)
@@ -198,24 +219,49 @@ def cleanup_chrome_processes():
"""清理残留的 Chrome 进程 (跨平台支持)"""
try:
if platform.system() == "Windows":
- # Windows: 使用 tasklist 和 taskkill
- result = subprocess.run(
- ['tasklist', '/FI', 'IMAGENAME eq chrome.exe', '/FO', 'CSV'],
- capture_output=True, text=True, timeout=5
- )
-
- if 'chrome.exe' in result.stdout:
+ # Windows: 使用 taskkill 清理 chromedriver 和 chrome
+ try:
subprocess.run(
['taskkill', '/F', '/IM', 'chromedriver.exe'],
capture_output=True, timeout=5
)
- log.step("已清理 chromedriver 残留进程")
+ except Exception:
+ pass
+
+ # 清理无头模式的 chrome 进程 (带 --headless 参数的)
+ try:
+ result = subprocess.run(
+ ['wmic', 'process', 'where', "name='chrome.exe' and commandline like '%--headless%'", 'get', 'processid'],
+ capture_output=True, text=True, timeout=5
+ )
+ for line in result.stdout.strip().split('\n'):
+ pid = line.strip()
+ if pid.isdigit():
+ subprocess.run(['taskkill', '/F', '/PID', pid], capture_output=True, timeout=5)
+ except Exception:
+ pass
+
+ log.step("已清理 Chrome 残留进程")
else:
# Linux/Mac: 使用 pkill
- subprocess.run(
- ['pkill', '-f', 'chromedriver'],
- capture_output=True, timeout=5
- )
+ try:
+ subprocess.run(
+ ['pkill', '-f', 'chromedriver'],
+ capture_output=True, timeout=5
+ )
+ except Exception:
+ pass
+
+ # 清理无头模式的 chrome 进程
+ try:
+ subprocess.run(
+ ['pkill', '-f', 'chrome.*--headless'],
+ capture_output=True, timeout=5
+ )
+ except Exception:
+ pass
+
+ log.step("已清理 Chrome 残留进程")
except Exception:
pass # 静默处理,不影响主流程
@@ -484,6 +530,9 @@ class BrowserRetryContext:
if not self._should_continue:
break
+ # 检查停止请求
+ check_shutdown()
+
self.current_attempt = attempt
# 非首次尝试时的清理和等待
@@ -495,8 +544,12 @@ class BrowserRetryContext:
# 初始化浏览器
try:
+ check_shutdown() # 再次检查
self.page = init_browser()
yield attempt
+ except ShutdownRequested:
+ self._should_continue = False
+ raise
except Exception as e:
log.error(f"浏览器初始化失败: {e}")
if attempt >= self.max_retries - 1:
@@ -504,6 +557,11 @@ class BrowserRetryContext:
def handle_error(self, error: Exception):
"""处理错误,决定是否继续重试"""
+ # 如果是停止请求,直接停止
+ if isinstance(error, ShutdownRequested):
+ self._should_continue = False
+ return
+
log.error(f"流程异常: {error}")
if self.current_attempt >= self.max_retries - 1:
self._should_continue = False
@@ -550,6 +608,9 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -
stable_count = 0
while time.time() - start_time < timeout:
+ # 检查停止请求
+ check_shutdown()
+
try:
# 检查浏览器标签页是否还在加载(favicon 旋转动画)
ready_state = page.run_js('return document.readyState', timeout=2)
@@ -567,6 +628,8 @@ def wait_for_page_stable(page, timeout: int = 10, check_interval: float = 0.5) -
stable_count = 0
last_html_len = current_len
time.sleep(check_interval)
+ except ShutdownRequested:
+ raise
except Exception:
time.sleep(check_interval)
@@ -626,11 +689,16 @@ def wait_for_element(page, selector: str, timeout: int = 10, visible: bool = Tru
start_time = time.time()
while time.time() - start_time < timeout:
+ # 检查停止请求
+ check_shutdown()
+
try:
element = page.ele(selector, timeout=1)
if element:
if not visible or (element.states.is_displayed if hasattr(element, 'states') else True):
return element
+ except ShutdownRequested:
+ raise
except Exception:
pass
time.sleep(0.3)
@@ -653,11 +721,16 @@ def wait_for_url_change(page, old_url: str, timeout: int = 15, contains: str = N
start_time = time.time()
while time.time() - start_time < timeout:
+ # 检查停止请求
+ check_shutdown()
+
try:
current_url = page.url
if current_url != old_url:
if contains is None or contains in current_url:
return True
+ except ShutdownRequested:
+ raise
except Exception:
pass
time.sleep(0.5)
diff --git a/run.py b/run.py
index 0b9671c..91d88ac 100644
--- a/run.py
+++ b/run.py
@@ -26,7 +26,7 @@ from team_service import batch_invite_to_team, print_team_summary, check_availab
from crs_service import crs_add_account, crs_sync_team_owners, crs_verify_token
from cpa_service import cpa_verify_connection
from s2a_service import s2a_verify_connection
-from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner
+from browser_automation import register_and_authorize, login_and_authorize_with_otp, authorize_only, login_and_authorize_team_owner, ShutdownRequested
from utils import (
save_to_csv,
load_team_tracker,
@@ -44,13 +44,14 @@ from logger import log
# 进度更新 (Telegram Bot 使用,导入失败时忽略)
try:
- from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish
+ from bot_notifier import progress_start, progress_update, progress_account_done, progress_finish, notify_team_completed_sync
except ImportError:
# 如果没有 bot_notifier,使用空函数
def progress_start(team_name, total): pass
def progress_update(account=None, step=None): pass
def progress_account_done(email, success): pass
def progress_finish(): pass
+ def notify_team_completed_sync(team_name, results): pass
# ==================== 全局状态 ====================
@@ -96,10 +97,13 @@ if threading.current_thread() is threading.main_thread():
def process_single_team(team: dict) -> tuple[list, list]:
+def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -> tuple[list, list]:
"""处理单个 Team 的完整流程
Args:
team: Team 配置
+ team_index: 当前 Team 序号 (从 1 开始)
+ teams_total: Team 总数
Returns:
tuple: (处理结果列表, 待处理的 Owner 列表)
@@ -202,21 +206,42 @@ def process_single_team(team: dict) -> tuple[list, list]:
else:
log.warning(f"Team {team_name} 没有可用席位,无法邀请新成员")
- # ========== 阶段 3: 处理普通成员 (注册 + Codex 授权 + CRS) ==========
- if invited_accounts:
- log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + CRS 入库")
- member_results = process_accounts(invited_accounts, team_name)
- results.extend(member_results)
+ # ========== 合并成员和 Owner 一起处理 ==========
+ all_to_process = invited_accounts.copy()
+
+ # 添加未完成的 Owner 到处理列表
+ from config import INCLUDE_TEAM_OWNERS
+ include_owner = INCLUDE_TEAM_OWNERS and len(owner_accounts) > 0
+
+ if include_owner:
+ for owner in owner_accounts:
+ all_to_process.append({
+ "email": owner["email"],
+ "password": owner.get("password", DEFAULT_PASSWORD),
+ "status": owner.get("status", ""),
+ "role": "owner"
+ })
+ log.info(f"包含 {len(owner_accounts)} 个 Owner 账号一起处理")
- # Owner 不在这里处理,统一放到所有 Team 处理完后
+ # ========== 阶段 3: 处理所有账号 (注册 + Codex 授权 + 入库) ==========
+ if all_to_process:
+ log.section(f"阶段 3: 逐个注册 OpenAI + Codex 授权 + 入库")
+ all_results = process_accounts(
+ all_to_process, team_name,
+ team_index=team_index, teams_total=teams_total,
+ include_owner=include_owner
+ )
+ results.extend(all_results)
# ========== Team 处理完成 ==========
success_count = sum(1 for r in results if r["status"] == "success")
if results:
- log.success(f"{team_name} 成员处理完成: {success_count}/{len(results)} 成功")
+ log.success(f"{team_name} 处理完成: {success_count}/{len(results)} 成功")
+ # 发送 Team 完成报告到 Telegram
+ notify_team_completed_sync(team_name, results)
- # 返回未完成的 Owner 列表供后续统一处理
- return results, owner_accounts
+ # 返回空列表,因为 Owner 已经在这里处理了
+ return results, []
def _get_team_by_name(team_name: str) -> dict:
@@ -227,12 +252,16 @@ def _get_team_by_name(team_name: str) -> dict:
return {}
-def process_accounts(accounts: list, team_name: str) -> list:
+def process_accounts(accounts: list, team_name: str, team_index: int = 0,
+ teams_total: int = 0, include_owner: bool = False) -> list:
"""处理账号列表 (注册/授权/CRS)
Args:
accounts: 账号列表 [{"email", "password", "status", "role"}]
team_name: Team 名称
+ team_index: 当前 Team 序号 (从 1 开始)
+ teams_total: Team 总数
+ include_owner: 是否包含 Owner
Returns:
list: 处理结果
@@ -242,7 +271,7 @@ def process_accounts(accounts: list, team_name: str) -> list:
results = []
# 启动进度跟踪 (Telegram Bot)
- progress_start(team_name, len(accounts))
+ progress_start(team_name, len(accounts), team_index, teams_total, include_owner)
for i, account in enumerate(accounts):
if _shutdown_requested:
@@ -291,7 +320,7 @@ def process_accounts(accounts: list, team_name: str) -> list:
log.separator("#", 50)
# 更新进度: 当前账号
- progress_update(account=email, step="Starting...")
+ progress_update(account=email, step="Starting...", role=role)
result = {
"team": team_name,
@@ -331,10 +360,11 @@ def process_accounts(accounts: list, team_name: str) -> list:
update_account_status(_tracker, team_name, email, "processing")
save_team_tracker(_tracker)
- with Timer(f"账号 {email}"):
- if is_team_owner_otp:
- # 旧格式 Team Owner: 使用 OTP 登录授权
- log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth")
+ try:
+ with Timer(f"账号 {email}"):
+ if is_team_owner_otp:
+ # 旧格式 Team Owner: 使用 OTP 登录授权
+ log.info("Team Owner 账号 (旧格式),使用一次性验证码登录...", icon="auth")
progress_update(step="OTP Login...")
auth_success, codex_data = login_and_authorize_with_otp(email)
register_success = auth_success
@@ -469,6 +499,13 @@ def process_accounts(accounts: list, team_name: str) -> list:
update_account_status(_tracker, team_name, email, "register_failed")
save_team_tracker(_tracker)
+ except ShutdownRequested:
+ # 用户请求停止,保存当前状态并退出
+ log.warning(f"用户请求停止,当前账号: {email}")
+ # 不改变账号状态,保持中断前的状态,下次继续处理
+ save_team_tracker(_tracker)
+ break
+
# 保存到 CSV
save_to_csv(
email=email,
@@ -514,10 +551,10 @@ def run_all_teams():
log.warning(f"发现 {total_incomplete} 个未完成账号,将优先处理")
_current_results = []
- all_pending_owners = [] # 收集所有待处理的 Owner
+ teams_total = len(TEAMS)
with Timer("全部流程"):
- # ========== 第一阶段: 处理所有 Team 的普通成员 ==========
+ # ========== 处理所有 Team (成员 + Owner 一起) ==========
for i, team in enumerate(TEAMS):
if _shutdown_requested:
log.warning("检测到中断请求,停止处理...")
@@ -525,51 +562,18 @@ def run_all_teams():
log.separator("★", 60)
team_email = team.get('account') or team.get('owner_email', '')
- log.highlight(f"Team {i + 1}/{len(TEAMS)}: {team['name']} ({team_email})", icon="team")
+ log.highlight(f"Team {i + 1}/{teams_total}: {team['name']} ({team_email})", icon="team")
log.separator("★", 60)
- results, pending_owners = process_single_team(team)
-
- # 收集待处理的 Owner
- if pending_owners:
- for owner in pending_owners:
- all_pending_owners.append({
- "team_name": team["name"],
- "email": owner["email"],
- "password": owner.get("password", DEFAULT_PASSWORD),
- "status": owner.get("status", "team_owner"),
- "role": "owner"
- })
+ # 传递 Team 序号信息
+ results, _ = process_single_team(team, team_index=i + 1, teams_total=teams_total)
+ _current_results.extend(results)
# Team 之间的间隔
- if i < len(TEAMS) - 1 and not _shutdown_requested:
+ if i < teams_total - 1 and not _shutdown_requested:
wait_time = 3
log.countdown(wait_time, "下一个 Team")
- # ========== 第二阶段: 统一处理所有 Team Owner 的 CRS 授权 ==========
- if all_pending_owners and not _shutdown_requested:
- log.separator("★", 60)
- log.header(f"统一处理 Team Owner CRS 授权 ({len(all_pending_owners)} 个)")
- log.separator("★", 60)
-
- for i, owner in enumerate(all_pending_owners):
- if _shutdown_requested:
- log.warning("检测到中断请求,停止处理...")
- break
-
- log.separator("#", 50)
- log.info(f"Owner {i + 1}/{len(all_pending_owners)}: {owner['email']} ({owner['team_name']})", icon="account")
- log.separator("#", 50)
-
- owner_results = process_accounts([owner], owner["team_name"])
- _current_results.extend(owner_results)
-
- # Owner 之间的间隔
- if i < len(all_pending_owners) - 1 and not _shutdown_requested:
- wait_time = random.randint(5, 15)
- log.info(f"等待 {wait_time}s 后处理下一个 Owner...", icon="wait")
- time.sleep(wait_time)
-
# 打印总结
print_summary(_current_results)
@@ -592,22 +596,10 @@ def run_single_team(team_index: int = 0):
log.info(f"单 Team 模式: {team['name']}", icon="start")
_current_results = []
- results, pending_owners = process_single_team(team)
+ # 单 Team 模式:序号为 1/1
+ results, _ = process_single_team(team, team_index=1, teams_total=1)
_current_results.extend(results)
- # 单 Team 模式下也处理 Owner
- if pending_owners:
- log.section(f"处理 Team Owner ({len(pending_owners)} 个)")
- for owner in pending_owners:
- owner_data = {
- "email": owner["email"],
- "password": owner.get("password", DEFAULT_PASSWORD),
- "status": owner.get("status", "team_owner"),
- "role": "owner"
- }
- owner_results = process_accounts([owner_data], team["name"])
- _current_results.extend(owner_results)
-
print_summary(_current_results)
return _current_results
diff --git a/team_service.py b/team_service.py
index fefe7e8..78b9a2a 100644
--- a/team_service.py
+++ b/team_service.py
@@ -113,7 +113,7 @@ def preload_all_account_ids() -> tuple[int, int]:
Returns:
tuple: (success_count, fail_count)
"""
- from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
+ import sys
success_count = 0
fail_count = 0
@@ -127,42 +127,71 @@ def preload_all_account_ids() -> tuple[int, int]:
log.success(f"所有 Team account_id 已缓存 ({len(teams_with_token)} 个)")
return len(teams_with_token), 0
+ total = len(teams_with_token)
log.info(f"预加载 {len(teams_need_fetch)} 个 Team 的 account_id...", icon="sync")
need_save = False
failed_teams = []
- with Progress(
- SpinnerColumn(),
- TextColumn("[progress.description]{task.description}"),
- BarColumn(),
- TaskProgressColumn(),
- TextColumn("{task.fields[status]}"),
- ) as progress:
- task = progress.add_task("加载中", total=len(teams_with_token), status="")
+ # 检测是否为 TTY 环境
+ is_tty = sys.stdout.isatty()
- for team in teams_with_token:
- progress.update(task, description=f"[cyan]{team['name']}", status="")
+ if is_tty:
+ # TTY 环境: 使用 rich 进度条
+ from rich.progress import Progress, SpinnerColumn, TextColumn, BarColumn, TaskProgressColumn
+
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ BarColumn(),
+ TaskProgressColumn(),
+ TextColumn("{task.fields[status]}"),
+ ) as progress:
+ task = progress.add_task("加载中", total=total, status="")
+
+ for team in teams_with_token:
+ progress.update(task, description=f"[cyan]{team['name']}", status="")
+
+ if team.get("account_id"):
+ success_count += 1
+ progress.update(task, advance=1, status="[green]✓ 已缓存")
+ continue
+
+ account_id = fetch_account_id(team, silent=True)
+ if account_id:
+ success_count += 1
+ progress.update(task, advance=1, status="[green]✓")
+ if team.get("format") == "new":
+ need_save = True
+ else:
+ fail_count += 1
+ failed_teams.append(team['name'])
+ progress.update(task, advance=1, status="[red]✗")
+ else:
+ # 非 TTY 环境 (systemd/journalctl): 使用普通日志输出
+ for idx, team in enumerate(teams_with_token, 1):
+ team_name = team['name']
if team.get("account_id"):
success_count += 1
- progress.update(task, advance=1, status="[green]✓ 已缓存")
+ log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ 已缓存")
continue
account_id = fetch_account_id(team, silent=True)
if account_id:
success_count += 1
- progress.update(task, advance=1, status="[green]✓")
+ log.info(f"预加载 [{idx}/{total}] {team_name}: ✓ {account_id}")
if team.get("format") == "new":
need_save = True
else:
fail_count += 1
- failed_teams.append(team['name'])
- progress.update(task, advance=1, status="[red]✗")
+ failed_teams.append(team_name)
+ log.warning(f"预加载 [{idx}/{total}] {team_name}: ✗ 失败")
- # 输出失败的 team
- for name in failed_teams:
- log.warning(f"Team {name}: 获取 account_id 失败")
+ # 输出失败的 team (仅 TTY 环境,非 TTY 已在循环中输出)
+ if is_tty:
+ for name in failed_teams:
+ log.warning(f"Team {name}: 获取 account_id 失败")
# 持久化到 team.json
if need_save:
diff --git a/telegram_bot.py b/telegram_bot.py
index eaaf2f6..ccd9631 100644
--- a/telegram_bot.py
+++ b/telegram_bot.py
@@ -44,7 +44,7 @@ from config import (
S2A_GROUP_IDS,
S2A_ADMIN_KEY,
)
-from utils import load_team_tracker
+from utils import load_team_tracker, get_all_incomplete_accounts
from bot_notifier import BotNotifier, set_notifier, progress_finish
from s2a_service import s2a_get_dashboard_stats, format_dashboard_stats
from email_service import gptmail_service, unified_create_email
@@ -98,6 +98,7 @@ class ProvisionerBot:
("fingerprint", self.cmd_fingerprint),
("run", self.cmd_run),
("run_all", self.cmd_run_all),
+ ("resume", self.cmd_resume),
("stop", self.cmd_stop),
("logs", self.cmd_logs),
("logs_live", self.cmd_logs_live),
@@ -173,6 +174,7 @@ class ProvisionerBot:
BotCommand("config", "查看系统配置"),
BotCommand("run", "处理指定 Team"),
BotCommand("run_all", "处理所有 Team"),
+ BotCommand("resume", "继续处理未完成账号"),
BotCommand("stop", "停止当前任务"),
BotCommand("logs", "查看最近日志"),
BotCommand("dashboard", "查看 S2A 仪表盘"),
@@ -209,6 +211,7 @@ class ProvisionerBot:
🚀 任务控制:
/run <n> - 开始处理第 n 个 Team
/run_all - 开始处理所有 Team
+/resume - 继续处理未完成账号
/stop - 停止当前任务
⚙️ 配置管理:
@@ -729,6 +732,53 @@ class ProvisionerBot:
self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "全部"))
+ @admin_only
+ async def cmd_resume(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
+ """继续处理未完成的账号"""
+ if self.current_task and not self.current_task.done():
+ await update.message.reply_text(
+ f"⚠️ 任务正在运行: {self.current_team}\n使用 /stop 停止"
+ )
+ return
+
+ # 检查是否有未完成的账号
+ tracker = load_team_tracker()
+ all_incomplete = get_all_incomplete_accounts(tracker)
+
+ if not all_incomplete:
+ await update.message.reply_text("✅ 没有待处理的账号,所有任务已完成")
+ return
+
+ # 统计未完成账号
+ total_incomplete = sum(len(accs) for accs in all_incomplete.values())
+ teams_count = len(all_incomplete)
+
+ # 构建消息
+ lines = [
+ f"⏳ 发现 {total_incomplete} 个未完成账号",
+ f"涉及 {teams_count} 个 Team:",
+ ""
+ ]
+
+ for team_name, accounts in all_incomplete.items():
+ lines.append(f" • {team_name}: {len(accounts)} 个")
+
+ lines.append("")
+ lines.append("🚀 开始继续处理...")
+
+ await update.message.reply_text("\n".join(lines), parse_mode="HTML")
+
+ # 启动任务 (run_all_teams 会自动处理未完成的账号)
+ self.current_team = "继续处理"
+
+ loop = asyncio.get_event_loop()
+ self.current_task = loop.run_in_executor(
+ self.executor,
+ self._run_all_teams_task
+ )
+
+ self.current_task = asyncio.ensure_future(self._wrap_task(self.current_task, "继续处理"))
+
async def _wrap_task(self, task, team_name: str):
"""包装任务以处理完成通知"""
try:
@@ -736,8 +786,10 @@ class ProvisionerBot:
# 收集成功和失败的账号
success_accounts = [r.get("email") for r in (result or []) if r.get("status") == "success"]
failed_accounts = [r.get("email") for r in (result or []) if r.get("status") != "success"]
+ log.info(f"任务完成: {team_name}, 成功: {len(success_accounts)}, 失败: {len(failed_accounts)}")
await self.notifier.notify_task_completed(team_name, success_accounts, failed_accounts)
except Exception as e:
+ log.error(f"任务异常: {team_name}, 错误: {e}")
await self.notifier.notify_error(f"任务失败: {team_name}", str(e))
finally:
self.current_team = None
@@ -748,11 +800,31 @@ class ProvisionerBot:
"""执行单个 Team 任务 (在线程池中运行)"""
# 延迟导入避免循环依赖
from run import run_single_team
+ from team_service import preload_all_account_ids
+ from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
+ from config import DEFAULT_PASSWORD
+
+ # 预加载 account_id
+ preload_all_account_ids()
+ _tracker = load_team_tracker()
+ add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
+ save_team_tracker(_tracker)
+
return run_single_team(team_idx)
def _run_all_teams_task(self):
"""执行所有 Team 任务 (在线程池中运行)"""
from run import run_all_teams
+ from team_service import preload_all_account_ids
+ from utils import load_team_tracker, save_team_tracker, add_team_owners_to_tracker
+ from config import DEFAULT_PASSWORD
+
+ # 预加载 account_id
+ preload_all_account_ids()
+ _tracker = load_team_tracker()
+ add_team_owners_to_tracker(_tracker, DEFAULT_PASSWORD)
+ save_team_tracker(_tracker)
+
return run_all_teams()
@admin_only
@@ -770,8 +842,10 @@ class ProvisionerBot:
try:
import run
run._shutdown_requested = True
+ # 获取当前运行结果
+ current_results = run._current_results.copy() if run._current_results else []
except Exception:
- pass
+ current_results = []
# 2. 取消 asyncio 任务
if self.current_task and not self.current_task.done():
@@ -797,13 +871,51 @@ class ProvisionerBot:
# 清理进度跟踪
progress_finish()
- await update.message.reply_text(
- f"✅ 任务已强制停止\n\n"
- f"已停止: {task_name}\n"
- f"已清理浏览器进程\n\n"
- f"使用 /status 查看状态",
- parse_mode="HTML"
- )
+ # 6. 生成停止报告
+ report_lines = [
+ f"🛑 任务已停止",
+ f"任务: {task_name}",
+ "",
+ ]
+
+ # 本次运行结果
+ if current_results:
+ success_count = sum(1 for r in current_results if r.get("status") == "success")
+ failed_count = len(current_results) - success_count
+ report_lines.append(f"📊 本次运行结果:")
+ report_lines.append(f" 成功: {success_count}")
+ report_lines.append(f" 失败: {failed_count}")
+ report_lines.append("")
+
+ # 获取未完成账号信息
+ tracker = load_team_tracker()
+ all_incomplete = get_all_incomplete_accounts(tracker)
+
+ if all_incomplete:
+ total_incomplete = sum(len(accs) for accs in all_incomplete.values())
+ report_lines.append(f"⏳ 待继续处理: {total_incomplete} 个账号")
+
+ # 显示每个 Team 的未完成账号 (最多显示 3 个 Team)
+ shown_teams = 0
+ for team_name, accounts in all_incomplete.items():
+ if shown_teams >= 3:
+ remaining = len(all_incomplete) - 3
+ report_lines.append(f" ... 还有 {remaining} 个 Team")
+ break
+
+ report_lines.append(f" {team_name}: {len(accounts)} 个")
+ # 显示第一个待处理账号
+ if accounts:
+ first_acc = accounts[0]
+ report_lines.append(f" 下一个: {first_acc['email']}")
+ shown_teams += 1
+
+ report_lines.append("")
+ report_lines.append("💡 使用 /resume 继续处理")
+ else:
+ report_lines.append("✅ 没有待处理的账号")
+
+ await update.message.reply_text("\n".join(report_lines), parse_mode="HTML")
except Exception as e:
await update.message.reply_text(f"❌ 停止任务时出错: {e}")