From b7e658c5674b24b7fd3e7c872e5cf680054c665c Mon Sep 17 00:00:00 2001 From: kyx236 Date: Fri, 30 Jan 2026 06:10:31 +0800 Subject: [PATCH] 4 --- run.py | 47 ++++++++++-- telegram_bot.py | 186 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 224 insertions(+), 9 deletions(-) diff --git a/run.py b/run.py index 5d071c9..67616b9 100644 --- a/run.py +++ b/run.py @@ -133,7 +133,7 @@ def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) - # 如果普通成员已完成目标数量,且没有未完成的 Owner,跳过 owner_incomplete = len(owner_accounts) if member_count >= ACCOUNTS_PER_TEAM and completed_count == member_count and owner_incomplete == 0: - print_team_summary(team) + # 已完成的 Team 直接跳过,不调用 API log.success(f"{team_name} 已完成 {completed_count}/{ACCOUNTS_PER_TEAM} 个成员账号,跳过") return results, [] @@ -1133,26 +1133,59 @@ def run_all_teams(): log.warning(f"发现 {total_incomplete} 个未完成账号,将优先处理") _current_results = [] - teams_total = len(TEAMS) + + # 筛选需要处理的 Team (有未完成账号或还没开始处理的) + teams_to_process = [] + for i, team in enumerate(TEAMS): + team_name = team["name"] + team_accounts = _tracker.get("teams", {}).get(team_name, []) + member_accounts = [acc for acc in team_accounts if acc.get("role") != "owner"] + owner_accounts = [acc for acc in team_accounts if acc.get("role") == "owner" and acc.get("status") != "completed"] + + completed_count = sum(1 for acc in member_accounts if acc.get("status") == "completed") + member_count = len(member_accounts) + + # 需要处理的条件: + # 1. 成员数量未达标 + # 2. 有未完成的成员 + # 3. 有未完成的 Owner + needs_processing = ( + member_count < ACCOUNTS_PER_TEAM or + completed_count < member_count or + len(owner_accounts) > 0 + ) + + if needs_processing: + teams_to_process.append((i, team)) + + if not teams_to_process: + log.success("所有 Team 已完成处理,无需继续") + return _current_results + + skipped_count = len(TEAMS) - len(teams_to_process) + if skipped_count > 0: + log.info(f"跳过 {skipped_count} 个已完成的 Team,处理剩余 {len(teams_to_process)} 个") + + teams_total = len(teams_to_process) with Timer("全部流程"): - # ========== 处理所有 Team (成员 + Owner 一起) ========== - for i, team in enumerate(TEAMS): + # ========== 处理需要处理的 Team (成员 + Owner 一起) ========== + for idx, (original_idx, team) in enumerate(teams_to_process): if _shutdown_requested: log.warning("检测到中断请求,停止处理...") break log.separator("★", 60) team_email = team.get('account') or team.get('owner_email', '') - log.highlight(f"Team {i + 1}/{teams_total}: {team['name']} ({team_email})", icon="team") + log.highlight(f"Team {idx + 1}/{teams_total}: {team['name']} ({team_email})", icon="team") log.separator("★", 60) # 传递 Team 序号信息 - results, _ = process_single_team(team, team_index=i + 1, teams_total=teams_total) + results, _ = process_single_team(team, team_index=idx + 1, teams_total=teams_total) _current_results.extend(results) # Team 之间的间隔 - if i < teams_total - 1 and not _shutdown_requested: + if idx < teams_total - 1 and not _shutdown_requested: wait_time = 3 log.countdown(wait_time, "下一个 Team") diff --git a/telegram_bot.py b/telegram_bot.py index fb956c5..cb5749f 100644 --- a/telegram_bot.py +++ b/telegram_bot.py @@ -2011,7 +2011,7 @@ class ProvisionerBot: pass # 忽略编辑失败 async def _finalize_import_batch(self, chat_id: int): - """完成批量导入,发送最终结果""" + """完成批量导入,发送最终结果,并自动验证 account_id""" async with self._import_progress_lock: if self._import_progress_message is None: return @@ -2027,10 +2027,187 @@ class ProvisionerBot: # 更新最终进度 await self._update_import_progress(chat_id, is_final=True) + # 保存统计数据用于后续验证 + added_count = self._import_batch_stats.get("total_added", 0) + # 重置状态 self._import_progress_message = None self._reset_import_batch_stats() + # 如果有新增账号,自动验证 account_id 并移除无效账号 + if added_count > 0: + await self._validate_and_cleanup_accounts(chat_id) + + async def _validate_and_cleanup_accounts(self, chat_id: int): + """验证新导入账号的 account_id,移除无效账号 + + Args: + chat_id: Telegram 聊天 ID,用于发送进度消息 + """ + import json + from pathlib import Path + from concurrent.futures import ThreadPoolExecutor, as_completed + + # 发送开始验证的消息 + progress_msg = await self.app.bot.send_message( + chat_id=chat_id, + text="🔍 正在验证账号...\n\n⏳ 获取 account_id 中...", + parse_mode="HTML" + ) + + try: + # 读取当前 team.json + team_json_path = Path(TEAM_JSON_FILE) + if not team_json_path.exists(): + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text="❌ team.json 不存在", + parse_mode="HTML" + ) + return + + with open(team_json_path, "r", encoding="utf-8") as f: + accounts = json.load(f) + if not isinstance(accounts, list): + accounts = [accounts] + + # 筛选需要验证的账号 (有 token 但没有 account_id) + accounts_to_verify = [] + for i, acc in enumerate(accounts): + token = acc.get("token", "") + account_id = acc.get("account_id", "") + if token and not account_id: + accounts_to_verify.append((i, acc)) + + if not accounts_to_verify: + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text="✅ 验证完成\n\n所有账号已有 account_id,无需验证", + parse_mode="HTML" + ) + return + + total = len(accounts_to_verify) + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text=f"🔍 正在验证账号...\n\n⏳ 验证 {total} 个账号的 account_id...", + parse_mode="HTML" + ) + + # 并行获取 account_id + valid_indices = set() # 验证成功的账号索引 + failed_accounts = [] # 验证失败的账号信息 + + def verify_account(idx_acc_tuple): + """验证单个账号""" + idx, acc = idx_acc_tuple + email = acc.get("account") or acc.get("email", "") + token = acc.get("token", "") + + # 构造临时 team 配置用于获取 account_id + temp_team = { + "name": email.split("@")[0] if "@" in email else f"Team{idx}", + "auth_token": token, + "account_id": "" + } + + from team_service import fetch_account_id + account_id = fetch_account_id(temp_team, silent=True) + + return idx, email, account_id + + # 使用线程池并行验证 + max_workers = min(10, total) + completed_count = 0 + + with ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = {executor.submit(verify_account, item): item for item in accounts_to_verify} + + for future in as_completed(futures): + idx, email, account_id = future.result() + completed_count += 1 + + if account_id: + # 验证成功,更新 account_id + accounts[idx]["account_id"] = account_id + valid_indices.add(idx) + else: + # 验证失败 + failed_accounts.append({"idx": idx, "email": email}) + + # 每处理 5 个更新一次进度 + if completed_count % 5 == 0 or completed_count == total: + try: + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text=f"🔍 正在验证账号...\n\n" + f"进度: {completed_count}/{total}\n" + f"✅ 成功: {len(valid_indices)}\n" + f"❌ 失败: {len(failed_accounts)}", + parse_mode="HTML" + ) + except Exception: + pass + + # 移除验证失败的账号 + if failed_accounts: + failed_indices = {item["idx"] for item in failed_accounts} + accounts = [acc for i, acc in enumerate(accounts) if i not in failed_indices] + + # 保存更新后的 team.json + with open(team_json_path, "w", encoding="utf-8") as f: + json.dump(accounts, f, ensure_ascii=False, indent=2) + + # 重载配置 + reload_config() + + # 发送最终结果 + lines = [ + "✅ 账号验证完成", + "", + f"📊 验证结果:", + f" • 验证总数: {total}", + f" • 成功: {len(valid_indices)}", + f" • 失败并移除: {len(failed_accounts)}", + f" • team.json 剩余: {len(accounts)}", + ] + + if failed_accounts: + lines.append("") + lines.append("❌ 已移除的无效账号:") + for item in failed_accounts[:5]: # 最多显示 5 个 + lines.append(f" • {item['email']}") + if len(failed_accounts) > 5: + lines.append(f" ... 还有 {len(failed_accounts) - 5} 个") + + lines.extend([ + "", + "💡 使用 /run_all 或 /run <n> 开始处理" + ]) + + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text="\n".join(lines), + parse_mode="HTML" + ) + + except Exception as e: + log.error(f"验证账号失败: {e}") + try: + await self.app.bot.edit_message_text( + chat_id=chat_id, + message_id=progress_msg.message_id, + text=f"❌ 验证失败\n\n{str(e)}", + parse_mode="HTML" + ) + except Exception: + pass + async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE): """批量导入超时回调 - 由 job_queue 调用""" chat_id = context.job.data.get("chat_id") @@ -2323,10 +2500,15 @@ class ProvisionerBot: f"跳过 (重复): {skipped}\n" f"team.json 总数: {len(existing_accounts)}\n\n" f"✅ 配置已自动刷新\n" - f"使用 /run_all 或 /run <n> 开始处理", + f"⏳ 正在验证账号 account_id...", parse_mode="HTML" ) + # 如果有新增账号,自动验证 account_id 并移除无效账号 + if added > 0: + chat_id = update.effective_chat.id + await self._validate_and_cleanup_accounts(chat_id) + except Exception as e: await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}")