Files
codexTool/team_service.py
2026-01-20 21:15:50 +08:00

454 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ==================== Team 服务模块 ====================
# 处理 ChatGPT Team 邀请相关功能
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config import (
TEAMS,
ACCOUNTS_PER_TEAM,
REQUEST_TIMEOUT,
USER_AGENT,
BROWSER_HEADLESS,
save_team_json
)
from logger import log
def create_session_with_retry():
"""创建带重试机制的 HTTP Session"""
session = requests.Session()
retry_strategy = Retry(
total=5,
backoff_factor=1,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "POST", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
http_session = create_session_with_retry()
def fetch_account_id(team: dict, silent: bool = False) -> str:
"""通过 API 获取 account_id (用于新格式配置)
Args:
team: Team 配置
silent: 是否静默模式 (不输出日志)
Returns:
str: account_id
"""
if team.get("account_id"):
return team["account_id"]
auth_token = team.get("auth_token", "")
if not auth_token:
return ""
if not auth_token.startswith("Bearer "):
auth_token = f"Bearer {auth_token}"
headers = {
"accept": "*/*",
"authorization": auth_token,
"content-type": "application/json",
"user-agent": USER_AGENT,
}
try:
# 使用 accounts/check API 获取账户信息
response = http_session.get(
"https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27",
headers=headers,
timeout=REQUEST_TIMEOUT
)
if response.status_code == 200:
data = response.json()
accounts = data.get("accounts", {})
if accounts:
# 优先查找 Team 账户 (plan_type 包含 team)
for acc_id, acc_info in accounts.items():
if acc_id == "default":
continue
account_data = acc_info.get("account", {})
plan_type = account_data.get("plan_type", "")
if "team" in plan_type.lower():
team["account_id"] = acc_id
if not silent:
log.success(f"获取到 Team account_id: {acc_id[:8]}...")
return acc_id
# 如果没有 Team 账户,取第一个非 default 的
for acc_id in accounts.keys():
if acc_id != "default":
team["account_id"] = acc_id
if not silent:
log.success(f"获取到 account_id: {acc_id[:8]}...")
return acc_id
else:
if not silent:
log.warning(f"获取 account_id 失败: HTTP {response.status_code}")
except Exception as e:
if not silent:
log.warning(f"获取 account_id 失败: {e}")
return ""
def _is_shutdown_requested() -> bool:
"""检查是否收到停止请求"""
try:
import run
return run._shutdown_requested
except Exception:
return False
def preload_all_account_ids() -> tuple[int, int]:
"""预加载所有 Team 的 account_id (并行处理)
在程序启动时调用,避免后续重复获取
只处理有 token 的 Team没有 token 的跳过
Returns:
tuple: (success_count, fail_count)
"""
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
success_count = 0
fail_count = 0
# 只处理有 token 的 Team
teams_with_token = [t for t in TEAMS if t.get("auth_token")]
teams_need_fetch = [t for t in teams_with_token if not t.get("account_id")]
# 已缓存的数量
cached_count = len(teams_with_token) - len(teams_need_fetch)
if not teams_need_fetch:
if teams_with_token:
log.success(f"所有 Team account_id 已缓存 ({len(teams_with_token)} 个)")
return len(teams_with_token), 0
total = len(teams_need_fetch)
log.info(f"并行预加载 {total} 个 Team 的 account_id...", icon="sync")
need_save = False
failed_teams = []
stopped = False
def fetch_one(team):
"""获取单个 Team 的 account_id"""
if _is_shutdown_requested():
return team, None, "stopped"
account_id = fetch_account_id(team, silent=True)
return team, account_id, "ok" if account_id else "failed"
# 使用线程池并行获取 (最多 10 个并发)
max_workers = min(10, total)
completed = 0
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(fetch_one, team): team for team in teams_need_fetch}
for future in as_completed(futures):
# 检查停止标志
if _is_shutdown_requested():
stopped = True
# 取消剩余任务
for f in futures:
f.cancel()
break
team, account_id, status = future.result()
completed += 1
if status == "stopped":
stopped = True
break
elif status == "ok":
success_count += 1
if team.get("format") == "new":
need_save = True
log.info(f"预加载 [{completed}/{total}] {team['name']}: ✓ {account_id}")
else:
fail_count += 1
failed_teams.append(team['name'])
log.warning(f"预加载 [{completed}/{total}] {team['name']}: ✗ 失败")
# 加上已缓存的数量
success_count += cached_count
# 持久化到 team.json
if need_save:
if save_team_json():
log.success(f"account_id 已保存到 team.json")
if stopped:
log.warning(f"预加载已停止 (已完成 {completed} 个)")
elif fail_count == 0 and success_count > 0:
log.success(f"所有 Team account_id 加载完成 ({success_count} 个)")
elif fail_count > 0:
log.warning(f"account_id 加载: 成功 {success_count}, 失败 {fail_count}")
return success_count, fail_count
def build_invite_headers(team: dict) -> dict:
"""构建邀请请求的 Headers"""
auth_token = team["auth_token"]
if not auth_token.startswith("Bearer "):
auth_token = f"Bearer {auth_token}"
# 如果没有 account_id尝试获取
account_id = team.get("account_id", "")
if not account_id:
account_id = fetch_account_id(team)
headers = {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"authorization": auth_token,
"content-type": "application/json",
"origin": "https://chatgpt.com",
"referer": "https://chatgpt.com/",
"user-agent": USER_AGENT,
"sec-ch-ua": '"Chromium";v="135", "Not)A;Brand";v="99", "Google Chrome";v="135"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
}
if account_id:
headers["chatgpt-account-id"] = account_id
return headers
def invite_single_email(email: str, team: dict) -> tuple[bool, str]:
"""邀请单个邮箱到 Team
Args:
email: 邮箱地址
team: Team 配置
Returns:
tuple: (success, message)
"""
headers = build_invite_headers(team)
payload = {
"email_addresses": [email],
"role": "standard-user",
"resend_emails": True
}
invite_url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites"
try:
response = http_session.post(invite_url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
result = response.json()
if result.get("account_invites"):
return True, "邀请成功"
elif result.get("errored_emails"):
return False, f"邀请错误: {result['errored_emails']}"
else:
return True, "邀请已发送"
else:
return False, f"HTTP {response.status_code}: {response.text[:200]}"
except Exception as e:
return False, str(e)
def batch_invite_to_team(emails: list, team: dict) -> dict:
"""批量邀请多个邮箱到 Team
Args:
emails: 邮箱列表
team: Team 配置
Returns:
dict: {"success": [...], "failed": [...]}
"""
log.info(f"批量邀请 {len(emails)} 个邮箱到 {team['name']} (ID: {team['account_id'][:8]}...)", icon="email")
headers = build_invite_headers(team)
payload = {
"email_addresses": emails,
"role": "standard-user",
"resend_emails": True
}
invite_url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites"
result = {
"success": [],
"failed": []
}
try:
response = http_session.post(invite_url, headers=headers, json=payload, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
resp_data = response.json()
# 处理成功邀请
if resp_data.get("account_invites"):
for invite in resp_data["account_invites"]:
invited_email = invite.get("email_address", "")
if invited_email:
result["success"].append(invited_email)
log.success(f"邀请成功: {invited_email}")
# 处理失败的邮箱
if resp_data.get("errored_emails"):
for err in resp_data["errored_emails"]:
err_email = err.get("email", "")
err_msg = err.get("error", "Unknown error")
if err_email:
result["failed"].append({"email": err_email, "error": err_msg})
log.error(f"邀请失败: {err_email} - {err_msg}")
# 如果没有明确的成功/失败信息,假设全部成功
if not resp_data.get("account_invites") and not resp_data.get("errored_emails"):
result["success"] = emails
for email in emails:
log.success(f"邀请成功: {email}")
else:
log.error(f"批量邀请失败: HTTP {response.status_code}")
result["failed"] = [{"email": e, "error": f"HTTP {response.status_code}"} for e in emails]
except Exception as e:
log.error(f"批量邀请异常: {e}")
result["failed"] = [{"email": e, "error": str(e)} for e in emails]
log.info(f"邀请结果: 成功 {len(result['success'])}, 失败 {len(result['failed'])}")
return result
def invite_single_to_team(email: str, team: dict) -> bool:
"""邀请单个邮箱到 Team
Args:
email: 邮箱地址
team: Team 配置
Returns:
bool: 是否成功
"""
result = batch_invite_to_team([email], team)
return email in result.get("success", [])
def get_team_stats(team: dict) -> dict:
"""获取 Team 的统计信息 (席位使用情况)
Args:
team: Team 配置
Returns:
dict: {"seats_in_use": int, "seats_entitled": int, "pending_invites": int}
"""
headers = build_invite_headers(team)
# 获取订阅信息
subs_url = f"https://chatgpt.com/backend-api/subscriptions?account_id={team['account_id']}"
try:
response = http_session.get(subs_url, headers=headers, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
data = response.json()
return {
"seats_in_use": data.get("seats_in_use", 0),
"seats_entitled": data.get("seats_entitled", 0),
"pending_invites": data.get("pending_invites", 0),
"plan_type": data.get("plan_type", ""),
}
else:
log.warning(f"获取 Team 统计失败: HTTP {response.status_code}")
except Exception as e:
log.warning(f"获取 Team 统计异常: {e}")
return {}
def get_pending_invites(team: dict) -> list:
"""获取 Team 的待处理邀请列表
Args:
team: Team 配置
Returns:
list: 待处理邀请列表
"""
headers = build_invite_headers(team)
url = f"https://chatgpt.com/backend-api/accounts/{team['account_id']}/invites?offset=0&limit=100&query="
try:
response = http_session.get(url, headers=headers, timeout=REQUEST_TIMEOUT)
if response.status_code == 200:
data = response.json()
return data.get("items", [])
except Exception as e:
log.warning(f"获取待处理邀请异常: {e}")
return []
def check_available_seats(team: dict) -> int:
"""检查 Team 可用席位数
Args:
team: Team 配置
Returns:
int: 可用席位数
"""
stats = get_team_stats(team)
if not stats:
return 0
seats_in_use = stats.get("seats_in_use", 0)
seats_entitled = stats.get("seats_entitled", 5) # 默认 5 席位
pending_invites = stats.get("pending_invites", 0) # 待处理邀请数
# 可用席位 = 总席位 - 已使用席位 - 待处理邀请 (待处理邀请也算预占用)
available = seats_entitled - seats_in_use - pending_invites
return max(0, available)
def print_team_summary(team: dict):
"""打印 Team 摘要信息"""
stats = get_team_stats(team)
pending = get_pending_invites(team)
log.info(f"{team['name']} 状态 (ID: {team['account_id'][:8]}...)", icon="team")
if stats:
seats_in_use = stats.get('seats_in_use', 0)
seats_entitled = stats.get('seats_entitled', 5)
pending_count = stats.get('pending_invites', 0)
# 可用席位 = 总席位 - 已使用 - 待处理邀请
available = seats_entitled - seats_in_use - pending_count
seats_info = f"席位: {seats_in_use}/{seats_entitled}"
pending_info = f"待处理邀请: {pending_count}"
available_info = f"可用席位: {max(0, available)}"
log.info(f"{seats_info} | {pending_info} | {available_info}")
else:
log.warning("无法获取状态信息")