This commit is contained in:
2026-01-30 06:10:31 +08:00
parent e43bd390f0
commit b7e658c567
2 changed files with 224 additions and 9 deletions

47
run.py
View File

@@ -133,7 +133,7 @@ def process_single_team(team: dict, team_index: int = 0, teams_total: int = 0) -
# 如果普通成员已完成目标数量,且没有未完成的 Owner跳过
owner_incomplete = len(owner_accounts)
if member_count >= ACCOUNTS_PER_TEAM and completed_count == member_count and owner_incomplete == 0:
print_team_summary(team)
# 已完成的 Team 直接跳过,不调用 API
log.success(f"{team_name} 已完成 {completed_count}/{ACCOUNTS_PER_TEAM} 个成员账号,跳过")
return results, []
@@ -1133,26 +1133,59 @@ def run_all_teams():
log.warning(f"发现 {total_incomplete} 个未完成账号,将优先处理")
_current_results = []
teams_total = len(TEAMS)
# 筛选需要处理的 Team (有未完成账号或还没开始处理的)
teams_to_process = []
for i, team in enumerate(TEAMS):
team_name = team["name"]
team_accounts = _tracker.get("teams", {}).get(team_name, [])
member_accounts = [acc for acc in team_accounts if acc.get("role") != "owner"]
owner_accounts = [acc for acc in team_accounts if acc.get("role") == "owner" and acc.get("status") != "completed"]
completed_count = sum(1 for acc in member_accounts if acc.get("status") == "completed")
member_count = len(member_accounts)
# 需要处理的条件:
# 1. 成员数量未达标
# 2. 有未完成的成员
# 3. 有未完成的 Owner
needs_processing = (
member_count < ACCOUNTS_PER_TEAM or
completed_count < member_count or
len(owner_accounts) > 0
)
if needs_processing:
teams_to_process.append((i, team))
if not teams_to_process:
log.success("所有 Team 已完成处理,无需继续")
return _current_results
skipped_count = len(TEAMS) - len(teams_to_process)
if skipped_count > 0:
log.info(f"跳过 {skipped_count} 个已完成的 Team处理剩余 {len(teams_to_process)}")
teams_total = len(teams_to_process)
with Timer("全部流程"):
# ========== 处理所有 Team (成员 + Owner 一起) ==========
for i, team in enumerate(TEAMS):
# ========== 处理需要处理的 Team (成员 + Owner 一起) ==========
for idx, (original_idx, team) in enumerate(teams_to_process):
if _shutdown_requested:
log.warning("检测到中断请求,停止处理...")
break
log.separator("", 60)
team_email = team.get('account') or team.get('owner_email', '')
log.highlight(f"Team {i + 1}/{teams_total}: {team['name']} ({team_email})", icon="team")
log.highlight(f"Team {idx + 1}/{teams_total}: {team['name']} ({team_email})", icon="team")
log.separator("", 60)
# 传递 Team 序号信息
results, _ = process_single_team(team, team_index=i + 1, teams_total=teams_total)
results, _ = process_single_team(team, team_index=idx + 1, teams_total=teams_total)
_current_results.extend(results)
# Team 之间的间隔
if i < teams_total - 1 and not _shutdown_requested:
if idx < teams_total - 1 and not _shutdown_requested:
wait_time = 3
log.countdown(wait_time, "下一个 Team")

View File

@@ -2011,7 +2011,7 @@ class ProvisionerBot:
pass # 忽略编辑失败
async def _finalize_import_batch(self, chat_id: int):
"""完成批量导入,发送最终结果"""
"""完成批量导入,发送最终结果,并自动验证 account_id"""
async with self._import_progress_lock:
if self._import_progress_message is None:
return
@@ -2027,10 +2027,187 @@ class ProvisionerBot:
# 更新最终进度
await self._update_import_progress(chat_id, is_final=True)
# 保存统计数据用于后续验证
added_count = self._import_batch_stats.get("total_added", 0)
# 重置状态
self._import_progress_message = None
self._reset_import_batch_stats()
# 如果有新增账号,自动验证 account_id 并移除无效账号
if added_count > 0:
await self._validate_and_cleanup_accounts(chat_id)
async def _validate_and_cleanup_accounts(self, chat_id: int):
"""验证新导入账号的 account_id移除无效账号
Args:
chat_id: Telegram 聊天 ID用于发送进度消息
"""
import json
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
# 发送开始验证的消息
progress_msg = await self.app.bot.send_message(
chat_id=chat_id,
text="<b>🔍 正在验证账号...</b>\n\n⏳ 获取 account_id 中...",
parse_mode="HTML"
)
try:
# 读取当前 team.json
team_json_path = Path(TEAM_JSON_FILE)
if not team_json_path.exists():
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text="❌ team.json 不存在",
parse_mode="HTML"
)
return
with open(team_json_path, "r", encoding="utf-8") as f:
accounts = json.load(f)
if not isinstance(accounts, list):
accounts = [accounts]
# 筛选需要验证的账号 (有 token 但没有 account_id)
accounts_to_verify = []
for i, acc in enumerate(accounts):
token = acc.get("token", "")
account_id = acc.get("account_id", "")
if token and not account_id:
accounts_to_verify.append((i, acc))
if not accounts_to_verify:
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text="<b>✅ 验证完成</b>\n\n所有账号已有 account_id无需验证",
parse_mode="HTML"
)
return
total = len(accounts_to_verify)
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text=f"<b>🔍 正在验证账号...</b>\n\n⏳ 验证 {total} 个账号的 account_id...",
parse_mode="HTML"
)
# 并行获取 account_id
valid_indices = set() # 验证成功的账号索引
failed_accounts = [] # 验证失败的账号信息
def verify_account(idx_acc_tuple):
"""验证单个账号"""
idx, acc = idx_acc_tuple
email = acc.get("account") or acc.get("email", "")
token = acc.get("token", "")
# 构造临时 team 配置用于获取 account_id
temp_team = {
"name": email.split("@")[0] if "@" in email else f"Team{idx}",
"auth_token": token,
"account_id": ""
}
from team_service import fetch_account_id
account_id = fetch_account_id(temp_team, silent=True)
return idx, email, account_id
# 使用线程池并行验证
max_workers = min(10, total)
completed_count = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(verify_account, item): item for item in accounts_to_verify}
for future in as_completed(futures):
idx, email, account_id = future.result()
completed_count += 1
if account_id:
# 验证成功,更新 account_id
accounts[idx]["account_id"] = account_id
valid_indices.add(idx)
else:
# 验证失败
failed_accounts.append({"idx": idx, "email": email})
# 每处理 5 个更新一次进度
if completed_count % 5 == 0 or completed_count == total:
try:
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text=f"<b>🔍 正在验证账号...</b>\n\n"
f"进度: {completed_count}/{total}\n"
f"✅ 成功: {len(valid_indices)}\n"
f"❌ 失败: {len(failed_accounts)}",
parse_mode="HTML"
)
except Exception:
pass
# 移除验证失败的账号
if failed_accounts:
failed_indices = {item["idx"] for item in failed_accounts}
accounts = [acc for i, acc in enumerate(accounts) if i not in failed_indices]
# 保存更新后的 team.json
with open(team_json_path, "w", encoding="utf-8") as f:
json.dump(accounts, f, ensure_ascii=False, indent=2)
# 重载配置
reload_config()
# 发送最终结果
lines = [
"<b>✅ 账号验证完成</b>",
"",
f"📊 验证结果:",
f" • 验证总数: {total}",
f" • 成功: {len(valid_indices)}",
f" • 失败并移除: {len(failed_accounts)}",
f" • team.json 剩余: {len(accounts)}",
]
if failed_accounts:
lines.append("")
lines.append("❌ 已移除的无效账号:")
for item in failed_accounts[:5]: # 最多显示 5 个
lines.append(f"{item['email']}")
if len(failed_accounts) > 5:
lines.append(f" ... 还有 {len(failed_accounts) - 5}")
lines.extend([
"",
"💡 使用 /run_all 或 /run &lt;n&gt; 开始处理"
])
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text="\n".join(lines),
parse_mode="HTML"
)
except Exception as e:
log.error(f"验证账号失败: {e}")
try:
await self.app.bot.edit_message_text(
chat_id=chat_id,
message_id=progress_msg.message_id,
text=f"<b>❌ 验证失败</b>\n\n{str(e)}",
parse_mode="HTML"
)
except Exception:
pass
async def _import_batch_timeout_callback(self, context: ContextTypes.DEFAULT_TYPE):
"""批量导入超时回调 - 由 job_queue 调用"""
chat_id = context.job.data.get("chat_id")
@@ -2323,10 +2500,15 @@ class ProvisionerBot:
f"跳过 (重复): {skipped}\n"
f"team.json 总数: {len(existing_accounts)}\n\n"
f"✅ 配置已自动刷新\n"
f"使用 /run_all 或 /run &lt;n&gt; 开始处理",
f"⏳ 正在验证账号 account_id...",
parse_mode="HTML"
)
# 如果有新增账号,自动验证 account_id 并移除无效账号
if added > 0:
chat_id = update.effective_chat.id
await self._validate_and_cleanup_accounts(chat_id)
except Exception as e:
await update.message.reply_text(f"❌ 保存到 team.json 失败: {e}")