"""
autoClaude Telegram Bot
基于 python-telegram-bot v21+ 的异步 Bot,封装 Claude 注册和 CC 检查流程。
启动方式: uv run python bot.py
"""
import asyncio
import json
import logging
import os
import random
import string
import time
import threading
from functools import wraps
from telegram import Update, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
MessageHandler,
ContextTypes,
filters,
)
from config import (
TG_BOT_TOKEN,
TG_ALLOWED_USERS,
MAIL_SYSTEMS,
)
from mail_service import MailPool, extract_magic_link
from stripe_token import StripeTokenizer
from gift_checker import GiftChecker
from claude_auth import attack_claude, finalize_login
import account_store
import proxy_pool
# --- 日志配置 ---
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# 降低 httpx 轮询日志级别,减少刷屏
logging.getLogger("httpx").setLevel(logging.WARNING)
# --- 全局状态 ---
_task_lock = threading.Lock()
_task_running = False
_task_name = ""
_stop_event = threading.Event()
# ============================================================
# 工具函数
# ============================================================
def restricted(func):
"""权限控制装饰器:仅允许 TG_ALLOWED_USERS 中的用户使用"""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
if TG_ALLOWED_USERS and user_id not in TG_ALLOWED_USERS:
await update.message.reply_text("⛔ 你没有权限使用此 Bot。")
return
return await func(update, context, *args, **kwargs)
return wrapper
def _set_task(name: str) -> bool:
"""尝试设置任务锁,返回是否成功"""
global _task_running, _task_name
with _task_lock:
if _task_running:
return False
_task_running = True
_task_name = name
_stop_event.clear()
return True
def _clear_task():
"""释放任务锁"""
global _task_running, _task_name
with _task_lock:
_task_running = False
_task_name = ""
_stop_event.clear()
def _is_stopped() -> bool:
"""检查是否收到停止信号"""
return _stop_event.is_set()
async def _edit_or_send(msg, text: str):
"""安全地编辑消息,如果失败则发送新消息"""
try:
await msg.edit_text(text, parse_mode="HTML")
except Exception:
pass
def _progress_bar(current: int, total: int, width: int = 12) -> str:
"""生成可视化进度条 ▓▓▓▓░░░░ 3/10 (30%)"""
if total <= 0:
return ""
pct = current / total
filled = round(width * pct)
bar = "▓" * filled + "░" * (width - filled)
return f"{bar} {current}/{total} ({round(pct * 100)}%)"
# ============================================================
# 命令处理
# ============================================================
@restricted
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start — 欢迎信息"""
welcome = (
"🤖 autoClaude Bot\n\n"
"可用命令:\n"
" /register [N] — 注册 Claude 账号\n"
" /check <卡号|月|年|CVC> — CC 检查\n"
" 📎 发送 .txt 文件 — 批量 CC 检查\n"
" /accounts — 查看账号 | /delete — 删除账号\n"
" /verify — 验证 SK 有效性\n"
" /stats — 统计面板\n"
" /stop — 中断当前任务\n"
" /proxy on|off — 代理开关\n"
" /proxytest — 测试代理 | /proxystatus — 代理状态\n"
" /mailstatus — 邮件系统状态\n"
" /status — 任务状态 | /help — 帮助\n\n"
f"👤 你的用户 ID: {update.effective_user.id}"
)
await update.message.reply_text(welcome, parse_mode="HTML")
@restricted
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/help — 命令列表"""
text = (
"📖 命令说明\n\n"
"📝 注册与账号\n"
" /register [N] — 注册 N 个账号\n"
" /accounts — 查看已注册账号\n"
" /delete <序号|邮箱> — 删除账号\n"
" /verify — 验证 SK 有效性\n\n"
"💳 CC 检查\n"
" /check — 单张检查\n"
" 📎 发送 .txt 文件 — 批量检查\n\n"
"🛠 工具与状态\n"
" /stop — 中断当前任务\n"
" /stats — 统计面板\n"
" /status — 任务状态\n\n"
"🌐 代理与邮件\n"
" /proxy on|off — 开关代理\n"
" /proxytest — 测试代理\n"
" /proxystatus — 代理池状态\n"
" /mailstatus — 邮件系统状态\n"
)
await update.message.reply_text(text, parse_mode="HTML")
@restricted
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/status — 查看任务状态"""
with _task_lock:
if _task_running:
await update.message.reply_text(
f"⏳ 当前任务:{_task_name}",
parse_mode="HTML",
)
else:
await update.message.reply_text("✅ 当前无运行中的任务。")
@restricted
async def cmd_stop(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stop — 中断当前运行的任务"""
with _task_lock:
if not _task_running:
await update.message.reply_text("✅ 当前没有运行中的任务。")
return
_stop_event.set()
await update.message.reply_text(
f"⏹ 正在停止任务:{_task_name}\n"
"任务将在当前步骤完成后安全退出...",
parse_mode="HTML",
)
@restricted
async def cmd_mailstatus(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/mailstatus — 检查邮箱系统连通性"""
status_msg = await update.message.reply_text("🔍 正在检测邮件系统...")
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
await _edit_or_send(status_msg, "❌ 没有可用的邮箱系统!请检查 config.toml 配置。")
return
text = f"📬 邮件系统状态(共 {mail_pool.count} 个)\n\n"
for i, ms in enumerate(mail_pool.systems, 1):
health = ms.check_health()
icon = "✅" if health["ok"] else "❌"
domains = ", ".join(ms.domains)
text += f"{i}. {icon} {ms.base_url}\n"
text += f" 域名: {domains}\n"
text += f" 状态: {health['message']}\n"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_proxytest(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxytest — 测试所有代理的连通性"""
pp = proxy_pool.pool
if pp.count == 0:
await update.message.reply_text("❌ 代理池为空(proxy.txt 不存在或无有效代理)")
return
status_msg = await update.message.reply_text(
f"🔍 正在测试 {pp.count} 个代理...(可能需要一些时间)"
)
# 在线程中执行测试避免阻塞事件循环
loop = asyncio.get_event_loop()
results = await loop.run_in_executor(None, pp.test_all)
# 构建结果报告
ok_count = sum(1 for r in results if r["ok"])
fail_count = len(results) - ok_count
text = (
f"🎯 代理测试结果\n\n"
f"✅ 通过: {ok_count} ❌ 失败: {fail_count} "
f"🚧 剩余可用: {pp.active_count}\n\n"
)
for r in results:
icon = "✅" if r["ok"] else "❌"
latency = f"{r['latency_ms']}ms" if r['latency_ms'] > 0 else "-"
prio = r.get('priority', '-')
text += f"{icon} {r['proxy']}\n"
text += f" 延迟: {latency} | 优先级: {prio}\n"
if not r["ok"]:
text += f" 错误: {r.get('error', '?')}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_proxystatus(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxystatus — 查看代理池状态"""
pp = proxy_pool.pool
if pp.count == 0:
await update.message.reply_text("❌ 代理池为空(proxy.txt 不存在或无有效代理)")
return
items = pp.status_list()
text = f"🌐 代理池状态(共 {len(items)} 个)\n\n"
for i, item in enumerate(items, 1):
icon = "✅" if item["last_ok"] else "❌"
text += (
f"{i}. {icon} {item['proxy']}\n"
f" 优先级: {item['priority']} | "
f"延迟: {item['latency_ms']}ms | "
f"✅{item['success']} ❌{item['fail']}\n"
)
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await update.message.reply_text(text, parse_mode="HTML")
@restricted
async def cmd_proxy(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxy on|off — 开关代理"""
pp = proxy_pool.pool
if not context.args:
# 无参数:显示当前状态
status = "✅ 已开启" if pp.enabled else "❌ 已关闭"
await update.message.reply_text(
f"🌐 代理状态: {status}\n"
f"📦 代理池: {pp.count} 个(活跃 {pp.active_count} 个)\n\n"
f"用法: /proxy on 或 /proxy off",
parse_mode="HTML",
)
return
arg = context.args[0].lower()
if arg == "on":
pp.enabled = True
await update.message.reply_text(
f"✅ 代理已开启(池中 {pp.active_count} 个可用)"
)
elif arg == "off":
pp.enabled = False
await update.message.reply_text("❌ 代理已关闭,所有请求将直连")
else:
await update.message.reply_text("❌ 用法: /proxy on 或 /proxy off")
@restricted
async def cmd_accounts(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/accounts — 列出已注册账号"""
accounts = account_store.read_all()
if not accounts:
await update.message.reply_text("📭 尚无已注册账号。")
return
text = f"📋 已注册账号(共 {len(accounts)} 个)\n\n"
for i, acc in enumerate(accounts, 1):
text += f"{i}. {acc['email']}\n SK: {acc['session_key']}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断,请点击下方按钮导出完整数据)"
keyboard = [[InlineKeyboardButton("📥 导出 JSON 文件", callback_data="export_accounts_json")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(text, parse_mode="HTML", reply_markup=reply_markup)
@restricted
async def cmd_delete(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/delete <序号|邮箱> — 删除指定账号"""
user_id = update.effective_user.id
if not context.args:
await update.message.reply_text(
"❌ 用法:\n"
" /delete 3 — 按序号删除\n"
" /delete user@example.com — 按邮箱删除\n\n"
"💡 使用 /accounts 查看序号",
parse_mode="HTML",
)
return
arg = context.args[0]
removed = None
# 尝试按序号删除
try:
index = int(arg)
removed = account_store.delete_by_index(index)
if not removed:
total = account_store.count()
await update.message.reply_text(f"❌ 无效序号。当前共 {total} 个账号。")
return
except ValueError:
# 按邮箱删除
removed = account_store.delete_by_email(arg)
if not removed:
await update.message.reply_text(f"❌ 未找到邮箱:{arg}")
return
await update.message.reply_text(
f"🗑 已删除账号:\n"
f"📧 {removed['email']}\n"
f"剩余 {account_store.count()} 个账号",
parse_mode="HTML",
)
@restricted
async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/verify — 检测已保存的 Session Key 是否仍然有效"""
accounts = account_store.read_all()
if not accounts:
await update.message.reply_text("📭 尚无已注册账号。")
return
status_msg = await update.message.reply_text(
f"🔍 正在验证 {len(accounts)} 个账号的 Session Key..."
)
from curl_cffi import requests as cffi_requests
from config import get_proxy
results = []
for i, acc in enumerate(accounts, 1):
sk = acc.get("session_key", "")
email = acc.get("email", "?")
if not sk:
results.append({"email": email, "ok": False, "reason": "SK 为空"})
continue
try:
resp = cffi_requests.get(
"https://claude.ai/api/organizations",
headers={
"Cookie": f"sessionKey={sk}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
},
impersonate="chrome124",
proxies=get_proxy(),
timeout=15,
)
if resp.status_code == 200:
results.append({"email": email, "ok": True, "reason": "有效"})
elif resp.status_code == 401:
results.append({"email": email, "ok": False, "reason": "已过期"})
elif resp.status_code == 403:
results.append({"email": email, "ok": False, "reason": "被封禁"})
else:
results.append({"email": email, "ok": False, "reason": f"HTTP {resp.status_code}"})
except Exception as e:
results.append({"email": email, "ok": False, "reason": str(e)[:50]})
valid = sum(1 for r in results if r["ok"])
invalid = len(results) - valid
text = (
f"🔑 账号验证结果\n\n"
f"✅ 有效: {valid} ❌ 无效: {invalid}\n\n"
)
for i, r in enumerate(results, 1):
icon = "✅" if r["ok"] else "❌"
text += f"{i}. {icon} {r['email']} — {r['reason']}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stats — 展示历史统计数据"""
stats = account_store.get_stats()
reg_total = stats.get("register_total", 0)
reg_ok = stats.get("register_success", 0)
reg_fail = stats.get("register_fail", 0)
reg_rate = f"{round(reg_ok / reg_total * 100)}%" if reg_total > 0 else "-"
cc_total = stats.get("cc_total", 0)
cc_ok = stats.get("cc_pass", 0)
cc_fail = stats.get("cc_fail", 0)
cc_rate = f"{round(cc_ok / cc_total * 100)}%" if cc_total > 0 else "-"
text = (
"📊 统计面板\n\n"
f"📝 注册统计\n"
f" 总计: {reg_total} | ✅ {reg_ok} | ❌ {reg_fail}\n"
f" 成功率: {reg_rate}\n"
)
# 失败原因分布
reasons = stats.get("register_fail_reasons", {})
if reasons:
text += " 失败原因:\n"
for reason, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
text += f" • {reason}: {cnt}\n"
text += (
f"\n💳 CC 检查统计\n"
f" 总计: {cc_total} | ✅ {cc_ok} | ❌ {cc_fail}\n"
f" 通过率: {cc_rate}\n"
)
text += f"\n📦 当前账号数: {account_store.count()}"
await update.message.reply_text(text, parse_mode="HTML")
# ============================================================
# /register — 注册 Claude 账号
# ============================================================
@restricted
async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/register [N] — 注册 Claude 账号"""
# 解析数量
count = 1
if context.args:
try:
count = int(context.args[0])
if count < 1 or count > 20:
await update.message.reply_text("❌ 数量范围:1-20")
return
except ValueError:
await update.message.reply_text("❌ 用法:/register [数量]")
return
if not _set_task(f"注册 {count} 个账号"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
f"🚀 开始注册 {count} 个 Claude 账号...",
parse_mode="HTML",
)
# 在后台线程执行耗时操作
loop = asyncio.get_event_loop()
threading.Thread(
target=_register_worker,
args=(loop, status_msg, count),
daemon=True,
).start()
def _register_worker(loop: asyncio.AbstractEventLoop, status_msg, count: int):
"""注册工作线程"""
results = {"success": 0, "fail": 0, "accounts": [], "fail_reasons": []}
try:
# 初始化邮箱系统池
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "📧 正在连接邮件系统..."),
loop,
).result(timeout=10)
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "❌ 没有可用的邮箱系统!"),
loop,
).result(timeout=10)
return
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"📧 邮箱系统就绪({mail_pool.count} 个)\n🚀 开始注册..."),
loop,
).result(timeout=10)
for i in range(count):
bar = _progress_bar(i, count)
step_header = f"📊 {bar}\n"
# Step 1: 创建邮箱(轮询选系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 创建临时邮箱...\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
# 检查停止信号
if _is_stopped():
break
random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
target_email, mail_sys = mail_pool.create_user(random_prefix)
if not target_email or not mail_sys:
results["fail"] += 1
reason = "邮箱创建失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# 检查停止信号
if _is_stopped():
break
# Step 2: 发送 Magic Link
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 发送 Magic Link...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
if _is_stopped():
break
if not attack_claude(target_email):
results["fail"] += 1
reason = "Magic Link 发送失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# Step 3: 等待邮件(使用创建时对应的系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 等待 Claude 邮件...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
email_content = mail_sys.wait_for_email(target_email, stop_check=_is_stopped)
if _is_stopped():
break
if not email_content:
results["fail"] += 1
reason = "邮件接收超时"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
magic_link = extract_magic_link(email_content)
if not magic_link:
results["fail"] += 1
reason = "Magic Link 解析失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# Step 4: 交换 SessionKey
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 交换 SessionKey...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
if _is_stopped():
break
account = finalize_login(magic_link)
if account:
results["success"] += 1
results["accounts"].append(account)
account_store.append(account.email, account.session_key, account.org_uuid)
account_store.record_register(True)
else:
results["fail"] += 1
reason = "SessionKey 交换失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
# 间隔防止限流
if i < count - 1:
time.sleep(2)
# 最终汇报
stopped = _is_stopped()
done_bar = _progress_bar(results["success"] + results["fail"], count)
title = "⏹ 注册已中断" if stopped else "🏁 注册完成"
report = (
f"{title}\n"
f"📊 {done_bar}\n\n"
f"✅ 成功: {results['success']}\n"
f"❌ 失败: {results['fail']}\n"
)
# 失败原因汇总
if results["fail_reasons"]:
from collections import Counter
reason_counts = Counter(results["fail_reasons"])
report += "\n失败原因:\n"
for reason, cnt in reason_counts.most_common():
report += f" • {reason}: {cnt} 次\n"
if results["accounts"]:
report += "\n新注册账号:\n"
for acc in results["accounts"]:
report += (
f"• {acc.email}\n"
f" SK: {acc.session_key}\n"
)
# Telegram 消息限制 4096 字符
if len(report) > 4000:
report = report[:4000] + "\n...(已截断,使用 /accounts 查看完整列表)"
keyboard = [[InlineKeyboardButton("📥 导出 JSON 文件", callback_data="export_accounts_json")]]
reply_markup = InlineKeyboardMarkup(keyboard)
async def _send_final():
try:
await status_msg.edit_text(report, parse_mode="HTML", reply_markup=reply_markup)
except Exception:
pass
asyncio.run_coroutine_threadsafe(
_send_final(),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("注册任务异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 注册任务异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# /check — CC 检查
# ============================================================
@restricted
async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/check — CC 检查"""
if not context.args:
await update.message.reply_text(
"❌ 用法:/check 卡号|月|年|CVC\n"
"示例:/check 4111111111111111|12|2025|123",
parse_mode="HTML",
)
return
card_line = context.args[0]
parts = card_line.split("|")
if len(parts) != 4:
await update.message.reply_text("❌ 格式错误,需要:卡号|月|年|CVC", parse_mode="HTML")
return
# 读取可用账号
acc_lines = account_store.read_lines()
if not acc_lines:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
return
if not _set_task("CC 检查"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
f"🔍 正在检查卡片:{parts[0][:4]}****{parts[0][-4:]}",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
threading.Thread(
target=_check_worker,
args=(loop, status_msg, card_line, acc_lines[-1]),
daemon=True,
).start()
def _check_worker(loop: asyncio.AbstractEventLoop, status_msg, card_line: str, account_line: str):
"""CC 检查工作线程"""
try:
cc, mm, yy, cvc = card_line.split("|")
acc_parts = account_line.split("|")
email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2]
from models import ClaudeAccount
from identity import random_ua
account = ClaudeAccount(email, session_key, org_uuid, random_ua())
masked = f"{cc[:4]}****{cc[-4:]}"
# Step 1: Stripe Token
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 {masked}\n⏳ 获取 Stripe Token..."),
loop,
).result(timeout=10)
tokenizer = StripeTokenizer(account.user_agent)
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 {masked}\n❌ Stripe 拒绝,无法获取 Token"),
loop,
).result(timeout=10)
return
# Step 2: Gift Purchase
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 {masked}\n⏳ 尝试扣款验证..."),
loop,
).result(timeout=10)
checker = GiftChecker(account)
result = checker.purchase(pm_id)
# 结果映射
result_map = {
"LIVE": "💰 LIVE — 扣款成功!卡有效",
"DECLINED": "🚫 DECLINED — 被拒绝",
"INSUFFICIENT_FUNDS": "💸 INSUFFICIENT — 余额不足(卡有效)",
"CCN_LIVE": "🔶 CCN LIVE — 卡号有效但 CVC 错误",
"DEAD": "💀 DEAD — 无效卡",
"ERROR": "⚠️ ERROR — 检查出错",
}
result_text = result_map.get(result, f"❓ 未知结果:{result}")
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 CC 检查结果\n\n"
f"卡片:{masked}\n"
f"结果:{result_text}",
),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 CC 检查异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# 文件上传 — 批量 CC 检查
# ============================================================
@restricted
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""接收 .txt 文件进行批量 CC 检查"""
doc = update.message.document
# 检查文件类型
if not doc.file_name.endswith(".txt"):
await update.message.reply_text("❌ 仅支持 .txt 文件")
return
# 检查文件大小(限制 1MB)
if doc.file_size > 1024 * 1024:
await update.message.reply_text("❌ 文件太大(最大 1MB)")
return
# 读取可用账号
acc_lines = account_store.read_lines()
if not acc_lines:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
return
if not _set_task("批量 CC 检查"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
# 下载文件
status_msg = await update.message.reply_text("📥 正在下载文件...")
try:
file = await doc.get_file()
file_bytes = await file.download_as_bytearray()
content = file_bytes.decode("utf-8", errors="ignore")
except Exception as e:
await _edit_or_send(status_msg, f"💥 下载文件失败:{e}")
_clear_task()
return
# 解析卡片
cards = []
for line in content.splitlines():
line = line.strip()
if line and not line.startswith("#"):
parts = line.split("|")
if len(parts) == 4:
cards.append(line)
if not cards:
await _edit_or_send(status_msg, "❌ 文件中没有找到有效卡片。\n格式:卡号|月|年|CVC")
_clear_task()
return
await _edit_or_send(
status_msg,
f"📋 读取到 {len(cards)} 张卡片,开始批量检查...",
)
# 后台线程执行
loop = asyncio.get_event_loop()
threading.Thread(
target=_batch_check_worker,
args=(loop, status_msg, cards, acc_lines[-1]),
daemon=True,
).start()
def _batch_check_worker(loop: asyncio.AbstractEventLoop, status_msg, cards: list, account_line: str):
"""批量 CC 检查工作线程"""
from models import ClaudeAccount
from identity import random_ua
results = []
total = len(cards)
try:
acc_parts = account_line.split("|")
email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2]
account = ClaudeAccount(email, session_key, org_uuid, random_ua())
tokenizer = StripeTokenizer(account.user_agent)
checker = GiftChecker(account)
for i, card_line in enumerate(cards):
if _is_stopped():
break
cc, mm, yy, cvc = card_line.split("|")
masked = f"{cc[:4]}****{cc[-4:]}"
# 更新进度
recent = "\n".join(results[-5:]) # 显示最近 5 条结果
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 [{i + 1}/{total}] 检查中:{masked}\n\n"
+ recent,
),
loop,
).result(timeout=10)
# Stripe Token
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
results.append(f"❌ {masked} → Stripe 拒绝")
time.sleep(1)
continue
# Gift Purchase
result = checker.purchase(pm_id)
result_icons = {
"LIVE": "💰", "DECLINED": "🚫",
"INSUFFICIENT_FUNDS": "💸", "CCN_LIVE": "🔶",
"DEAD": "💀", "ERROR": "⚠️",
}
icon = result_icons.get(result, "❓")
results.append(f"{icon} {masked} → {result}")
# 间隔防限流
if i < total - 1:
time.sleep(2)
# 最终汇报
live = sum(1 for r in results if "LIVE" in r and "CCN" not in r)
dead = sum(1 for r in results if "DEAD" in r or "DECLINED" in r or "Stripe" in r)
other = total - live - dead
report = (
f"🏁 批量 CC 检查完成\n\n"
f"📊 共 {total} 张 | 💰 有效 {live} | 💀 无效 {dead} | ❓ 其他 {other}\n\n"
)
report += "\n".join(results)
# Telegram 消息限制 4096
if len(report) > 4000:
report = report[:4000] + "\n...(已截断)"
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, report),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("批量 CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 批量 CC 检查异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# 回调处理 — 导出 JSON
# ============================================================
async def callback_export_json(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理导出 JSON 文件的回调"""
query = update.callback_query
await query.answer()
accounts = account_store.read_all()
if not accounts:
await query.message.reply_text("📭 没有账号数据。")
return
json_data = json.dumps(accounts, indent=2, ensure_ascii=False)
json_path = "accounts_export.json"
with open(json_path, "w", encoding="utf-8") as f:
f.write(json_data)
with open(json_path, "rb") as f:
await query.message.reply_document(
document=f,
filename="accounts.json",
caption=f"📋 共 {len(accounts)} 个账号",
)
# 清理临时文件
try:
os.remove(json_path)
except OSError:
pass
# ============================================================
# 启动 Bot
# ============================================================
async def post_init(application: Application):
"""Bot 启动后设置命令菜单"""
commands = [
BotCommand("start", "欢迎信息"),
BotCommand("register", "注册 Claude 账号 [数量]"),
BotCommand("stop", "中断当前任务"),
BotCommand("check", "CC 检查"),
BotCommand("accounts", "查看账号"),
BotCommand("delete", "删除账号"),
BotCommand("verify", "验证 SK 有效性"),
BotCommand("stats", "统计面板"),
BotCommand("mailstatus", "邮件系统状态"),
BotCommand("proxy", "代理开关 on/off"),
BotCommand("proxytest", "测试代理"),
BotCommand("proxystatus", "代理池状态"),
BotCommand("status", "任务状态"),
BotCommand("help", "命令帮助"),
]
await application.bot.set_my_commands(commands)
logger.info("Bot 命令菜单已设置")
def main():
"""启动 Bot"""
if TG_BOT_TOKEN == "your_bot_token_here":
print("❌ 请先在 config.toml 中设置 TG_BOT_TOKEN!")
return
app = Application.builder().token(TG_BOT_TOKEN).post_init(post_init).build()
# 注册命令
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("register", cmd_register))
app.add_handler(CommandHandler("stop", cmd_stop))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("accounts", cmd_accounts))
app.add_handler(CommandHandler("delete", cmd_delete))
app.add_handler(CommandHandler("verify", cmd_verify))
app.add_handler(CommandHandler("stats", cmd_stats))
app.add_handler(CommandHandler("mailstatus", cmd_mailstatus))
app.add_handler(CommandHandler("proxytest", cmd_proxytest))
app.add_handler(CommandHandler("proxystatus", cmd_proxystatus))
app.add_handler(CommandHandler("proxy", cmd_proxy))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CallbackQueryHandler(callback_export_json, pattern="^export_accounts_json$"))
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
logger.info("🤖 Bot 启动中...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()