# ==================== Team 服务模块 ==================== # 处理 ChatGPT Team 邀请相关功能 import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry from config import ( TEAMS, ACCOUNTS_PER_TEAM, REQUEST_TIMEOUT, USER_AGENT, BROWSER_HEADLESS, save_team_json ) from logger import log def create_session_with_retry(): """创建带重试机制的 HTTP Session""" session = requests.Session() retry_strategy = Retry( total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504], allowed_methods=["HEAD", "GET", "POST", "OPTIONS"] ) adapter = HTTPAdapter(max_retries=retry_strategy) session.mount("https://", adapter) session.mount("http://", adapter) return session http_session = create_session_with_retry() def fetch_account_id(team: dict, silent: bool = False) -> str: """通过 API 获取 account_id (用于新格式配置) Args: team: Team 配置 silent: 是否静默模式 (不输出日志) Returns: str: account_id """ if team.get("account_id"): return team["account_id"] auth_token = team.get("auth_token", "") if not auth_token: return "" if not auth_token.startswith("Bearer "): auth_token = f"Bearer {auth_token}" headers = { "accept": "*/*", "authorization": auth_token, "content-type": "application/json", "user-agent": USER_AGENT, } try: # 使用 accounts/check API 获取账户信息 response = http_session.get( "https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27", headers=headers, timeout=REQUEST_TIMEOUT ) if response.status_code == 200: data = response.json() accounts = data.get("accounts", {}) if accounts: # 优先查找 Team 账户 (plan_type 包含 team) for acc_id, acc_info in accounts.items(): if acc_id == "default": continue account_data = acc_info.get("account", {}) plan_type = account_data.get("plan_type", "") if "team" in plan_type.lower(): team["account_id"] = acc_id if not silent: log.success(f"获取到 Team account_id: {acc_id[:8]}...") return acc_id # 如果没有 Team 账户,取第一个非 default 的 for acc_id in accounts.keys(): if acc_id != "default": team["account_id"] = acc_id if not silent: log.success(f"获取到 account_id: {acc_id[:8]}...") return acc_id else: if not silent: log.warning(f"获取 account_id 失败: HTTP {response.status_code}") except Exception as e: if not silent: log.warning(f"获取 account_id 失败: {e}") return "" def _is_shutdown_requested() -> bool: """检查是否收到停止请求""" try: import run return run._shutdown_requested except Exception: return False def preload_all_account_ids() -> tuple[int, int]: """预加载所有 Team 的 account_id (并行处理) 在程序启动时调用,避免后续重复获取 只处理有 token 的 Team,没有 token 的跳过 Returns: tuple: (success_count, fail_count) """ import sys from concurrent.futures import ThreadPoolExecutor, as_completed # 导入 Telegram 进度显示接口 try: from bot_notifier import ( preload_progress_start, preload_progress_update, preload_progress_done, preload_progress_finish ) except ImportError: def preload_progress_start(total): pass def preload_progress_update(team_name): pass def preload_progress_done(team_name, success): pass def preload_progress_finish(stopped=False): pass success_count = 0 fail_count = 0 # 只处理有 token 的 Team teams_with_token = [t for t in TEAMS if t.get("auth_token")] teams_need_fetch = [t for t in teams_with_token if not t.get("account_id")] # 已缓存的数量 cached_count = len(teams_with_token) - len(teams_need_fetch) if not teams_need_fetch: if teams_with_token: log.success(f"所有 Team account_id 已缓存 ({len(teams_with_token)} 个)") return len(teams_with_token), 0 total = len(teams_need_fetch) log.info(f"并行预加载 {total} 个 Team 的 account_id...", icon="sync") # 启动 Telegram 进度显示 preload_progress_start(total) need_save = False failed_teams = [] stopped = False def fetch_one(team): """获取单个 Team 的 account_id""" if _is_shutdown_requested(): return team, None, "stopped" account_id = fetch_account_id(team, silent=True) return team, account_id, "ok" if account_id else "failed" # 使用线程池并行获取 (最多 10 个并发) max_workers = min(10, total) completed = 0 with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(fetch_one, team): team for team in teams_need_fetch} for future in as_completed(futures): # 检查停止标志 if _is_shutdown_requested(): stopped = True # 取消剩余任务 for f in futures: f.cancel() break team, account_id, status = future.result() completed += 1 team_name = team['name'] if status == "stopped": stopped = True break elif status == "ok": success_count += 1 if team.get("format") == "new": need_save = True log.info(f"预加载 [{completed}/{total}] {team_name}: ✓ {account_id}") preload_progress_done(team_name, True) else: fail_count += 1 failed_teams.append(team_name) log.warning(f"预加载 [{completed}/{total}] {team_name}: ✗ 失败") preload_progress_done(team_name, False) # 加上已缓存的数量 success_count += cached_count # 完成 Telegram 进度显示 preload_progress_finish(stopped=stopped) # 持久化到 team.json if need_save: if save_team_json(): log.success(f"account_id 已保存到 team.json") if stopped: log.warning(f"预加载已停止 (已完成 {completed} 个)") elif fail_count == 0 and success_count > 0: log.success(f"所有 Team account_id 加载完成 ({success_count} 个)") elif fail_count > 0: log.warning(f"account_id 加载: 成功 {success_count}, 失败 {fail_count}") return success_count, fail_count def build_invite_headers(team: dict) -> dict: """构建邀请请求的 Headers""" auth_token = team["auth_token"] if not auth_token.startswith("Bearer "): auth_token = f"Bearer {auth_token}" # 如果没有 account_id,尝试获取 account_id = team.get("account_id", "") if not account_id: account_id = fetch_account_id(team) headers = { "accept": "*/*", "accept-language": "zh-CN,zh;q=0.9", "authorization": auth_token, "content-type": "application/json", "origin": "https://chatgpt.com", "referer": "https://chatgpt.com/", "user-agent": USER_AGENT, "sec-ch-ua": '"Chromium";v="135", "Not)A;Brand";v="99", "Google Chrome";v="135"', "sec-ch-ua-mobile": "?0", "sec-ch-ua-platform": '"Windows"', } if account_id: headers["chatgpt-account-id"] = account_id return headers def invite_single_email(email: str, team: dict) -> tuple[bool, str]: """邀请单个邮箱到 Team Args: email: 邮箱地址 team: Team 配置 Returns: tuple: (success, message) """ headers = build_invite_headers(team) payload = { "email_addresses": [email], "role": "standard-user", "resend_emails": True } invite_url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites" try: response = http_session.post(invite_url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT) if response.status_code == 200: result = response.json() if result.get("account_invites"): return True, "邀请成功" elif result.get("errored_emails"): return False, f"邀请错误: {result['errored_emails']}" else: return True, "邀请已发送" else: return False, f"HTTP {response.status_code}: {response.text[:200]}" except Exception as e: return False, str(e) def batch_invite_to_team(emails: list, team: dict) -> dict: """批量邀请多个邮箱到 Team Args: emails: 邮箱列表 team: Team 配置 Returns: dict: {"success": [...], "failed": [...]} """ log.info(f"批量邀请 {len(emails)} 个邮箱到 {team['name']} (ID: {team['account_id'][:8]}...)", icon="email") headers = build_invite_headers(team) payload = { "email_addresses": emails, "role": "standard-user", "resend_emails": True } invite_url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites" result = { "success": [], "failed": [] } try: response = http_session.post(invite_url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT) if response.status_code == 200: resp_data = response.json() # 处理成功邀请 if resp_data.get("account_invites"): for invite in resp_data["account_invites"]: invited_email = invite.get("email_address", "") if invited_email: result["success"].append(invited_email) log.success(f"邀请成功: {invited_email}") # 处理失败的邮箱 if resp_data.get("errored_emails"): for err in resp_data["errored_emails"]: err_email = err.get("email", "") err_msg = err.get("error", "Unknown error") if err_email: result["failed"].append({"email": err_email, "error": err_msg}) log.error(f"邀请失败: {err_email} - {err_msg}") # 如果没有明确的成功/失败信息,假设全部成功 if not resp_data.get("account_invites") and not resp_data.get("errored_emails"): result["success"] = emails for email in emails: log.success(f"邀请成功: {email}") else: log.error(f"批量邀请失败: HTTP {response.status_code}") result["failed"] = [{"email": e, "error": f"HTTP {response.status_code}"} for e in emails] except Exception as e: log.error(f"批量邀请异常: {e}") result["failed"] = [{"email": e, "error": str(e)} for e in emails] log.info(f"邀请结果: 成功 {len(result['success'])}, 失败 {len(result['failed'])}") return result def invite_single_to_team(email: str, team: dict) -> bool: """邀请单个邮箱到 Team Args: email: 邮箱地址 team: Team 配置 Returns: bool: 是否成功 """ result = batch_invite_to_team([email], team) return email in result.get("success", []) def get_team_stats(team: dict) -> dict: """获取 Team 的统计信息 (席位使用情况) Args: team: Team 配置 Returns: dict: {"seats_in_use": int, "seats_entitled": int, "pending_invites": int} """ headers = build_invite_headers(team) # 获取订阅信息 subs_url = f"https://chatgpt.com/backend-api/subscriptions?account_id={team['account_id']}" try: response = http_session.get(subs_url, headers=headers, timeout=REQUEST_TIMEOUT) if response.status_code == 200: data = response.json() return { "seats_in_use": data.get("seats_in_use", 0), "seats_entitled": data.get("seats_entitled", 0), "pending_invites": data.get("pending_invites", 0), "plan_type": data.get("plan_type", ""), } else: log.warning(f"获取 Team 统计失败: HTTP {response.status_code}") except Exception as e: log.warning(f"获取 Team 统计异常: {e}") return {} def get_pending_invites(team: dict) -> list: """获取 Team 的待处理邀请列表 Args: team: Team 配置 Returns: list: 待处理邀请列表 """ headers = build_invite_headers(team) url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites?offset=0&limit=100&query=" try: response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT) if response.status_code == 200: data = response.json() return data.get("items", []) except Exception as e: log.warning(f"获取待处理邀请异常: {e}") return [] def check_available_seats(team: dict) -> int: """检查 Team 可用席位数 Args: team: Team 配置 Returns: int: 可用席位数 """ stats = get_team_stats(team) if not stats: return 0 seats_in_use = stats.get("seats_in_use", 0) seats_entitled = stats.get("seats_entitled", 5) # 默认 5 席位 pending_invites = stats.get("pending_invites", 0) # 待处理邀请数 # 可用席位 = 总席位 - 已使用席位 - 待处理邀请 (待处理邀请也算预占用) available = seats_entitled - seats_in_use - pending_invites return max(0, available) def print_team_summary(team: dict): """打印 Team 摘要信息""" stats = get_team_stats(team) pending = get_pending_invites(team) log.info(f"{team['name']} 状态 (ID: {team['account_id'][:8]}...)", icon="team") if stats: seats_in_use = stats.get('seats_in_use', 0) seats_entitled = stats.get('seats_entitled', 5) pending_count = stats.get('pending_invites', 0) # 可用席位 = 总席位 - 已使用 - 待处理邀请 available = seats_entitled - seats_in_use - pending_count seats_info = f"席位: {seats_in_use}/{seats_entitled}" pending_info = f"待处理邀请: {pending_count}" available_info = f"可用席位: {max(0, available)}" log.info(f"{seats_info} | {pending_info} | {available_info}") else: log.warning("无法获取状态信息")