feat: Implement Telegram bot with Claude authentication, mail service, and identity spoofing, refactoring core logic into new modules.

This commit is contained in:
2026-02-12 22:16:17 +08:00
parent 02d70ac3cd
commit ad7b6196dc
14 changed files with 1577 additions and 470 deletions

658
bot.py Normal file
View File

@@ -0,0 +1,658 @@
"""
autoClaude Telegram Bot
基于 python-telegram-bot v21+ 的异步 Bot封装 Claude 注册和 CC 检查流程。
启动方式: uv run python bot.py
"""
import asyncio
import logging
import random
import string
import time
import threading
from functools import wraps
from telegram import Update, BotCommand
from telegram.ext import (
Application,
CommandHandler,
MessageHandler,
ContextTypes,
filters,
)
from config import (
TG_BOT_TOKEN,
TG_ALLOWED_USERS,
MAIL_SYSTEMS,
)
from mail_service import MailPool, extract_magic_link
from stripe_token import StripeTokenizer
from gift_checker import GiftChecker
from claude_auth import attack_claude, finalize_login
# --- 日志配置 ---
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# --- 全局状态 ---
_task_lock = threading.Lock()
_task_running = False
_task_name = ""
# ============================================================
# 工具函数
# ============================================================
def restricted(func):
"""权限控制装饰器:仅允许 TG_ALLOWED_USERS 中的用户使用"""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
if TG_ALLOWED_USERS and user_id not in TG_ALLOWED_USERS:
await update.message.reply_text("⛔ 你没有权限使用此 Bot。")
return
return await func(update, context, *args, **kwargs)
return wrapper
def _set_task(name: str) -> bool:
"""尝试设置任务锁,返回是否成功"""
global _task_running, _task_name
with _task_lock:
if _task_running:
return False
_task_running = True
_task_name = name
return True
def _clear_task():
"""释放任务锁"""
global _task_running, _task_name
with _task_lock:
_task_running = False
_task_name = ""
async def _edit_or_send(msg, text: str):
"""安全地编辑消息,如果失败则发送新消息"""
try:
await msg.edit_text(text, parse_mode="HTML")
except Exception:
pass
# ============================================================
# 命令处理
# ============================================================
@restricted
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start — 欢迎信息"""
welcome = (
"🤖 <b>autoClaude Bot</b>\n\n"
"可用命令:\n"
" /register [N] — 注册 Claude 账号(默认 1 个)\n"
" /check &lt;卡号|月|年|CVC&gt; — 单张 CC 检查\n"
" 📎 发送 .txt 文件 — 批量 CC 检查\n"
" /accounts — 查看已注册账号\n"
" /status — 当前任务状态\n"
" /help — 帮助\n\n"
f"👤 你的用户 ID: <code>{update.effective_user.id}</code>"
)
await update.message.reply_text(welcome, parse_mode="HTML")
@restricted
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/help — 命令列表"""
text = (
"📖 <b>命令说明</b>\n\n"
"<b>/register [N]</b>\n"
" 注册 N 个 Claude 账号(默认 1\n"
" 流程:创建邮箱 → 发送 Magic Link → 等待邮件 → 交换 SessionKey\n"
" 注册结果自动保存到 accounts.txt\n\n"
"<b>/check &lt;CARD|MM|YY|CVC&gt;</b>\n"
" 单张信用卡检查。\n\n"
"<b>📎 发送 .txt 文件</b>\n"
" 批量 CC 检查,文件每行一张卡:\n"
" <code>卡号|月|年|CVC</code>\n\n"
"<b>/accounts</b>\n"
" 列出 accounts.txt 中保存的所有账号。\n\n"
"<b>/status</b>\n"
" 查看当前是否有后台任务在运行。\n"
)
await update.message.reply_text(text, parse_mode="HTML")
@restricted
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/status — 查看任务状态"""
with _task_lock:
if _task_running:
await update.message.reply_text(
f"⏳ 当前任务:<b>{_task_name}</b>",
parse_mode="HTML",
)
else:
await update.message.reply_text("✅ 当前无运行中的任务。")
@restricted
async def cmd_accounts(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/accounts — 列出已注册账号"""
try:
with open("accounts.txt", "r") as f:
lines = [l.strip() for l in f if l.strip()]
except FileNotFoundError:
await update.message.reply_text("📭 尚无已注册账号accounts.txt 不存在)。")
return
if not lines:
await update.message.reply_text("📭 accounts.txt 为空。")
return
text = f"📋 <b>已注册账号(共 {len(lines)} 个)</b>\n\n"
for i, line in enumerate(lines, 1):
parts = line.split("|")
email = parts[0] if len(parts) > 0 else "?"
sk = parts[1][:12] + "..." if len(parts) > 1 and len(parts[1]) > 12 else (parts[1] if len(parts) > 1 else "?")
text += f"{i}. <code>{email}</code>\n SK: <code>{sk}</code>\n"
# Telegram 消息限制 4096 字符
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await update.message.reply_text(text, parse_mode="HTML")
# ============================================================
# /register — 注册 Claude 账号
# ============================================================
@restricted
async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/register [N] — 注册 Claude 账号"""
# 解析数量
count = 1
if context.args:
try:
count = int(context.args[0])
if count < 1 or count > 20:
await update.message.reply_text("❌ 数量范围1-20")
return
except ValueError:
await update.message.reply_text("❌ 用法:/register [数量]")
return
if not _set_task(f"注册 {count} 个账号"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
f"🚀 开始注册 {count} 个 Claude 账号...",
parse_mode="HTML",
)
# 在后台线程执行耗时操作
loop = asyncio.get_event_loop()
threading.Thread(
target=_register_worker,
args=(loop, status_msg, count),
daemon=True,
).start()
def _register_worker(loop: asyncio.AbstractEventLoop, status_msg, count: int):
"""注册工作线程"""
results = {"success": 0, "fail": 0, "accounts": []}
try:
# 初始化邮箱系统池
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "📧 正在连接邮件系统..."),
loop,
).result(timeout=10)
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "❌ 没有可用的邮箱系统!"),
loop,
).result(timeout=10)
return
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"📧 邮箱系统就绪({mail_pool.count} 个)\n🚀 开始注册..."),
loop,
).result(timeout=10)
for i in range(count):
progress = f"[{i + 1}/{count}]"
# Step 1: 创建邮箱(轮询选系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{progress} 创建临时邮箱...\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
target_email, mail_sys = mail_pool.create_user(random_prefix)
if not target_email or not mail_sys:
results["fail"] += 1
continue
# Step 2: 发送 Magic Link
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{progress} 发送 Magic Link...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
if not attack_claude(target_email):
results["fail"] += 1
continue
# Step 3: 等待邮件(使用创建时对应的系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{progress} 等待 Claude 邮件...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
email_content = mail_sys.wait_for_email(target_email)
if not email_content:
results["fail"] += 1
continue
magic_link = extract_magic_link(email_content)
if not magic_link:
results["fail"] += 1
continue
# Step 4: 交换 SessionKey
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{progress} 交换 SessionKey...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
account = finalize_login(magic_link)
if account:
results["success"] += 1
results["accounts"].append(account)
# 保存到文件
with open("accounts.txt", "a") as f:
f.write(f"{account.email}|{account.session_key}|{account.org_uuid}\n")
else:
results["fail"] += 1
# 间隔防止限流
if i < count - 1:
time.sleep(2)
# 最终汇报
report = (
f"🏁 <b>注册完成</b>\n\n"
f"✅ 成功: {results['success']}\n"
f"❌ 失败: {results['fail']}\n"
)
if results["accounts"]:
report += "\n<b>新注册账号:</b>\n"
for acc in results["accounts"]:
report += f"• <code>{acc.email}</code>\n"
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, report),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("注册任务异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 注册任务异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# /check — CC 检查
# ============================================================
@restricted
async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/check <CARD|MM|YY|CVC> — CC 检查"""
if not context.args:
await update.message.reply_text(
"❌ 用法:/check <code>卡号|月|年|CVC</code>\n"
"示例:/check 4111111111111111|12|2025|123",
parse_mode="HTML",
)
return
card_line = context.args[0]
parts = card_line.split("|")
if len(parts) != 4:
await update.message.reply_text("❌ 格式错误,需要:<code>卡号|月|年|CVC</code>", parse_mode="HTML")
return
# 读取可用账号
try:
with open("accounts.txt", "r") as f:
lines = [l.strip() for l in f if l.strip()]
except FileNotFoundError:
lines = []
if not lines:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
return
if not _set_task("CC 检查"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
f"🔍 正在检查卡片:<code>{parts[0][:4]}****{parts[0][-4:]}</code>",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
threading.Thread(
target=_check_worker,
args=(loop, status_msg, card_line, lines[-1]),
daemon=True,
).start()
def _check_worker(loop: asyncio.AbstractEventLoop, status_msg, card_line: str, account_line: str):
"""CC 检查工作线程"""
try:
cc, mm, yy, cvc = card_line.split("|")
acc_parts = account_line.split("|")
email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2]
from models import ClaudeAccount
from identity import random_ua
account = ClaudeAccount(email, session_key, org_uuid, random_ua())
masked = f"{cc[:4]}****{cc[-4:]}"
# Step 1: Stripe Token
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n⏳ 获取 Stripe Token..."),
loop,
).result(timeout=10)
tokenizer = StripeTokenizer(account.user_agent)
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n❌ Stripe 拒绝,无法获取 Token"),
loop,
).result(timeout=10)
return
# Step 2: Gift Purchase
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n⏳ 尝试扣款验证..."),
loop,
).result(timeout=10)
checker = GiftChecker(account)
result = checker.purchase(pm_id)
# 结果映射
result_map = {
"LIVE": "💰 LIVE — 扣款成功!卡有效",
"DECLINED": "🚫 DECLINED — 被拒绝",
"INSUFFICIENT_FUNDS": "💸 INSUFFICIENT — 余额不足(卡有效)",
"CCN_LIVE": "🔶 CCN LIVE — 卡号有效但 CVC 错误",
"DEAD": "💀 DEAD — 无效卡",
"ERROR": "⚠️ ERROR — 检查出错",
}
result_text = result_map.get(result, f"❓ 未知结果:{result}")
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 <b>CC 检查结果</b>\n\n"
f"卡片:<code>{masked}</code>\n"
f"结果:{result_text}",
),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 CC 检查异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# 文件上传 — 批量 CC 检查
# ============================================================
@restricted
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""接收 .txt 文件进行批量 CC 检查"""
doc = update.message.document
# 检查文件类型
if not doc.file_name.endswith(".txt"):
await update.message.reply_text("❌ 仅支持 .txt 文件")
return
# 检查文件大小(限制 1MB
if doc.file_size > 1024 * 1024:
await update.message.reply_text("❌ 文件太大(最大 1MB")
return
# 读取可用账号
try:
with open("accounts.txt", "r") as f:
acc_lines = [l.strip() for l in f if l.strip()]
except FileNotFoundError:
acc_lines = []
if not acc_lines:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
return
if not _set_task("批量 CC 检查"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
# 下载文件
status_msg = await update.message.reply_text("📥 正在下载文件...")
try:
file = await doc.get_file()
file_bytes = await file.download_as_bytearray()
content = file_bytes.decode("utf-8", errors="ignore")
except Exception as e:
await _edit_or_send(status_msg, f"💥 下载文件失败:{e}")
_clear_task()
return
# 解析卡片
cards = []
for line in content.splitlines():
line = line.strip()
if line and not line.startswith("#"):
parts = line.split("|")
if len(parts) == 4:
cards.append(line)
if not cards:
await _edit_or_send(status_msg, "❌ 文件中没有找到有效卡片。\n格式:<code>卡号|月|年|CVC</code>")
_clear_task()
return
await _edit_or_send(
status_msg,
f"📋 读取到 <b>{len(cards)}</b> 张卡片,开始批量检查...",
)
# 后台线程执行
loop = asyncio.get_event_loop()
threading.Thread(
target=_batch_check_worker,
args=(loop, status_msg, cards, acc_lines[-1]),
daemon=True,
).start()
def _batch_check_worker(loop: asyncio.AbstractEventLoop, status_msg, cards: list, account_line: str):
"""批量 CC 检查工作线程"""
from models import ClaudeAccount
from identity import random_ua
results = []
total = len(cards)
try:
acc_parts = account_line.split("|")
email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2]
account = ClaudeAccount(email, session_key, org_uuid, random_ua())
tokenizer = StripeTokenizer(account.user_agent)
checker = GiftChecker(account)
for i, card_line in enumerate(cards):
cc, mm, yy, cvc = card_line.split("|")
masked = f"{cc[:4]}****{cc[-4:]}"
# 更新进度
recent = "\n".join(results[-5:]) # 显示最近 5 条结果
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 [{i + 1}/{total}] 检查中:<code>{masked}</code>\n\n"
+ recent,
),
loop,
).result(timeout=10)
# Stripe Token
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
results.append(f"❌ <code>{masked}</code> → Stripe 拒绝")
time.sleep(1)
continue
# Gift Purchase
result = checker.purchase(pm_id)
result_icons = {
"LIVE": "💰", "DECLINED": "🚫",
"INSUFFICIENT_FUNDS": "💸", "CCN_LIVE": "🔶",
"DEAD": "💀", "ERROR": "⚠️",
}
icon = result_icons.get(result, "")
results.append(f"{icon} <code>{masked}</code> → {result}")
# 间隔防限流
if i < total - 1:
time.sleep(2)
# 最终汇报
live = sum(1 for r in results if "LIVE" in r and "CCN" not in r)
dead = sum(1 for r in results if "DEAD" in r or "DECLINED" in r or "Stripe" in r)
other = total - live - dead
report = (
f"🏁 <b>批量 CC 检查完成</b>\n\n"
f"📊 共 {total} 张 | 💰 有效 {live} | 💀 无效 {dead} | ❓ 其他 {other}\n\n"
)
report += "\n".join(results)
# Telegram 消息限制 4096
if len(report) > 4000:
report = report[:4000] + "\n...(已截断)"
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, report),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("批量 CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 批量 CC 检查异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# 启动 Bot
# ============================================================
async def post_init(application: Application):
"""Bot 启动后设置命令菜单"""
commands = [
BotCommand("start", "欢迎信息"),
BotCommand("register", "注册 Claude 账号 [数量]"),
BotCommand("check", "CC 检查 <卡号|月|年|CVC>"),
BotCommand("accounts", "查看已注册账号"),
BotCommand("status", "当前任务状态"),
BotCommand("help", "命令帮助"),
]
await application.bot.set_my_commands(commands)
logger.info("Bot 命令菜单已设置")
def main():
"""启动 Bot"""
if TG_BOT_TOKEN == "your_bot_token_here":
print("❌ 请先在 config.toml 中设置 TG_BOT_TOKEN")
return
app = Application.builder().token(TG_BOT_TOKEN).post_init(post_init).build()
# 注册命令
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("register", cmd_register))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("accounts", cmd_accounts))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
logger.info("🤖 Bot 启动中...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()