""" autoClaude Telegram Bot 基于 python-telegram-bot v21+ 的异步 Bot,封装 Claude 注册和 CC 检查流程。 启动方式: uv run python bot.py """ import asyncio import logging import random import string import time import threading from functools import wraps from telegram import Update, BotCommand from telegram.ext import ( Application, CommandHandler, MessageHandler, ContextTypes, filters, ) from config import ( TG_BOT_TOKEN, TG_ALLOWED_USERS, MAIL_SYSTEMS, ) from mail_service import MailPool, extract_magic_link from stripe_token import StripeTokenizer from gift_checker import GiftChecker from claude_auth import attack_claude, finalize_login # --- 日志配置 --- logging.basicConfig( format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", level=logging.INFO, ) logger = logging.getLogger(__name__) # --- 全局状态 --- _task_lock = threading.Lock() _task_running = False _task_name = "" # ============================================================ # 工具函数 # ============================================================ def restricted(func): """权限控制装饰器:仅允许 TG_ALLOWED_USERS 中的用户使用""" @wraps(func) async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs): user_id = update.effective_user.id if TG_ALLOWED_USERS and user_id not in TG_ALLOWED_USERS: await update.message.reply_text("⛔ 你没有权限使用此 Bot。") return return await func(update, context, *args, **kwargs) return wrapper def _set_task(name: str) -> bool: """尝试设置任务锁,返回是否成功""" global _task_running, _task_name with _task_lock: if _task_running: return False _task_running = True _task_name = name return True def _clear_task(): """释放任务锁""" global _task_running, _task_name with _task_lock: _task_running = False _task_name = "" async def _edit_or_send(msg, text: str): """安全地编辑消息,如果失败则发送新消息""" try: await msg.edit_text(text, parse_mode="HTML") except Exception: pass # ============================================================ # 命令处理 # ============================================================ @restricted async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE): """/start — 欢迎信息""" welcome = ( "🤖 autoClaude Bot\n\n" "可用命令:\n" " /register [N] — 注册 Claude 账号(默认 1 个)\n" " /check <卡号|月|年|CVC> — 单张 CC 检查\n" " 📎 发送 .txt 文件 — 批量 CC 检查\n" " /accounts — 查看已注册账号\n" " /status — 当前任务状态\n" " /help — 帮助\n\n" f"👤 你的用户 ID: {update.effective_user.id}" ) await update.message.reply_text(welcome, parse_mode="HTML") @restricted async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE): """/help — 命令列表""" text = ( "📖 命令说明\n\n" "/register [N]\n" " 注册 N 个 Claude 账号(默认 1)。\n" " 流程:创建邮箱 → 发送 Magic Link → 等待邮件 → 交换 SessionKey\n" " 注册结果自动保存到 accounts.txt\n\n" "/check <CARD|MM|YY|CVC>\n" " 单张信用卡检查。\n\n" "📎 发送 .txt 文件\n" " 批量 CC 检查,文件每行一张卡:\n" " 卡号|月|年|CVC\n\n" "/accounts\n" " 列出 accounts.txt 中保存的所有账号。\n\n" "/status\n" " 查看当前是否有后台任务在运行。\n" ) await update.message.reply_text(text, parse_mode="HTML") @restricted async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE): """/status — 查看任务状态""" with _task_lock: if _task_running: await update.message.reply_text( f"⏳ 当前任务:{_task_name}", parse_mode="HTML", ) else: await update.message.reply_text("✅ 当前无运行中的任务。") @restricted async def cmd_accounts(update: Update, context: ContextTypes.DEFAULT_TYPE): """/accounts — 列出已注册账号""" try: with open("accounts.txt", "r") as f: lines = [l.strip() for l in f if l.strip()] except FileNotFoundError: await update.message.reply_text("📭 尚无已注册账号(accounts.txt 不存在)。") return if not lines: await update.message.reply_text("📭 accounts.txt 为空。") return text = f"📋 已注册账号(共 {len(lines)} 个)\n\n" for i, line in enumerate(lines, 1): parts = line.split("|") email = parts[0] if len(parts) > 0 else "?" sk = parts[1][:12] + "..." if len(parts) > 1 and len(parts[1]) > 12 else (parts[1] if len(parts) > 1 else "?") text += f"{i}. {email}\n SK: {sk}\n" # Telegram 消息限制 4096 字符 if len(text) > 4000: text = text[:4000] + "\n...(已截断)" await update.message.reply_text(text, parse_mode="HTML") # ============================================================ # /register — 注册 Claude 账号 # ============================================================ @restricted async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE): """/register [N] — 注册 Claude 账号""" # 解析数量 count = 1 if context.args: try: count = int(context.args[0]) if count < 1 or count > 20: await update.message.reply_text("❌ 数量范围:1-20") return except ValueError: await update.message.reply_text("❌ 用法:/register [数量]") return if not _set_task(f"注册 {count} 个账号"): await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}") return status_msg = await update.message.reply_text( f"🚀 开始注册 {count} 个 Claude 账号...", parse_mode="HTML", ) # 在后台线程执行耗时操作 loop = asyncio.get_event_loop() threading.Thread( target=_register_worker, args=(loop, status_msg, count), daemon=True, ).start() def _register_worker(loop: asyncio.AbstractEventLoop, status_msg, count: int): """注册工作线程""" results = {"success": 0, "fail": 0, "accounts": []} try: # 初始化邮箱系统池 asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, "📧 正在连接邮件系统..."), loop, ).result(timeout=10) mail_pool = MailPool(MAIL_SYSTEMS) if mail_pool.count == 0: asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, "❌ 没有可用的邮箱系统!"), loop, ).result(timeout=10) return asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"📧 邮箱系统就绪({mail_pool.count} 个)\n🚀 开始注册..."), loop, ).result(timeout=10) for i in range(count): progress = f"[{i + 1}/{count}]" # Step 1: 创建邮箱(轮询选系统) asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"⏳ {progress} 创建临时邮箱...\n\n" f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}", ), loop, ).result(timeout=10) random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10)) target_email, mail_sys = mail_pool.create_user(random_prefix) if not target_email or not mail_sys: results["fail"] += 1 continue # Step 2: 发送 Magic Link asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"⏳ {progress} 发送 Magic Link...\n" f"📧 {target_email}\n\n" f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}", ), loop, ).result(timeout=10) if not attack_claude(target_email): results["fail"] += 1 continue # Step 3: 等待邮件(使用创建时对应的系统) asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"⏳ {progress} 等待 Claude 邮件...\n" f"📧 {target_email}\n\n" f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}", ), loop, ).result(timeout=10) email_content = mail_sys.wait_for_email(target_email) if not email_content: results["fail"] += 1 continue magic_link = extract_magic_link(email_content) if not magic_link: results["fail"] += 1 continue # Step 4: 交换 SessionKey asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"⏳ {progress} 交换 SessionKey...\n" f"📧 {target_email}\n\n" f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}", ), loop, ).result(timeout=10) account = finalize_login(magic_link) if account: results["success"] += 1 results["accounts"].append(account) # 保存到文件 with open("accounts.txt", "a") as f: f.write(f"{account.email}|{account.session_key}|{account.org_uuid}\n") else: results["fail"] += 1 # 间隔防止限流 if i < count - 1: time.sleep(2) # 最终汇报 report = ( f"🏁 注册完成\n\n" f"✅ 成功: {results['success']}\n" f"❌ 失败: {results['fail']}\n" ) if results["accounts"]: report += "\n新注册账号:\n" for acc in results["accounts"]: report += f"• {acc.email}\n" asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, report), loop, ).result(timeout=10) except Exception as e: logger.exception("注册任务异常") asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"💥 注册任务异常:{e}"), loop, ) finally: _clear_task() # ============================================================ # /check — CC 检查 # ============================================================ @restricted async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE): """/check — CC 检查""" if not context.args: await update.message.reply_text( "❌ 用法:/check 卡号|月|年|CVC\n" "示例:/check 4111111111111111|12|2025|123", parse_mode="HTML", ) return card_line = context.args[0] parts = card_line.split("|") if len(parts) != 4: await update.message.reply_text("❌ 格式错误,需要:卡号|月|年|CVC", parse_mode="HTML") return # 读取可用账号 try: with open("accounts.txt", "r") as f: lines = [l.strip() for l in f if l.strip()] except FileNotFoundError: lines = [] if not lines: await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。") return if not _set_task("CC 检查"): await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}") return status_msg = await update.message.reply_text( f"🔍 正在检查卡片:{parts[0][:4]}****{parts[0][-4:]}", parse_mode="HTML", ) loop = asyncio.get_event_loop() threading.Thread( target=_check_worker, args=(loop, status_msg, card_line, lines[-1]), daemon=True, ).start() def _check_worker(loop: asyncio.AbstractEventLoop, status_msg, card_line: str, account_line: str): """CC 检查工作线程""" try: cc, mm, yy, cvc = card_line.split("|") acc_parts = account_line.split("|") email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2] from models import ClaudeAccount from identity import random_ua account = ClaudeAccount(email, session_key, org_uuid, random_ua()) masked = f"{cc[:4]}****{cc[-4:]}" # Step 1: Stripe Token asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"🔍 {masked}\n⏳ 获取 Stripe Token..."), loop, ).result(timeout=10) tokenizer = StripeTokenizer(account.user_agent) pm_id = tokenizer.get_token(cc, mm, yy, cvc) if not pm_id: asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"🔍 {masked}\n❌ Stripe 拒绝,无法获取 Token"), loop, ).result(timeout=10) return # Step 2: Gift Purchase asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"🔍 {masked}\n⏳ 尝试扣款验证..."), loop, ).result(timeout=10) checker = GiftChecker(account) result = checker.purchase(pm_id) # 结果映射 result_map = { "LIVE": "💰 LIVE — 扣款成功!卡有效", "DECLINED": "🚫 DECLINED — 被拒绝", "INSUFFICIENT_FUNDS": "💸 INSUFFICIENT — 余额不足(卡有效)", "CCN_LIVE": "🔶 CCN LIVE — 卡号有效但 CVC 错误", "DEAD": "💀 DEAD — 无效卡", "ERROR": "⚠️ ERROR — 检查出错", } result_text = result_map.get(result, f"❓ 未知结果:{result}") asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"🔍 CC 检查结果\n\n" f"卡片:{masked}\n" f"结果:{result_text}", ), loop, ).result(timeout=10) except Exception as e: logger.exception("CC 检查异常") asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"💥 CC 检查异常:{e}"), loop, ) finally: _clear_task() # ============================================================ # 文件上传 — 批量 CC 检查 # ============================================================ @restricted async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE): """接收 .txt 文件进行批量 CC 检查""" doc = update.message.document # 检查文件类型 if not doc.file_name.endswith(".txt"): await update.message.reply_text("❌ 仅支持 .txt 文件") return # 检查文件大小(限制 1MB) if doc.file_size > 1024 * 1024: await update.message.reply_text("❌ 文件太大(最大 1MB)") return # 读取可用账号 try: with open("accounts.txt", "r") as f: acc_lines = [l.strip() for l in f if l.strip()] except FileNotFoundError: acc_lines = [] if not acc_lines: await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。") return if not _set_task("批量 CC 检查"): await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}") return # 下载文件 status_msg = await update.message.reply_text("📥 正在下载文件...") try: file = await doc.get_file() file_bytes = await file.download_as_bytearray() content = file_bytes.decode("utf-8", errors="ignore") except Exception as e: await _edit_or_send(status_msg, f"💥 下载文件失败:{e}") _clear_task() return # 解析卡片 cards = [] for line in content.splitlines(): line = line.strip() if line and not line.startswith("#"): parts = line.split("|") if len(parts) == 4: cards.append(line) if not cards: await _edit_or_send(status_msg, "❌ 文件中没有找到有效卡片。\n格式:卡号|月|年|CVC") _clear_task() return await _edit_or_send( status_msg, f"📋 读取到 {len(cards)} 张卡片,开始批量检查...", ) # 后台线程执行 loop = asyncio.get_event_loop() threading.Thread( target=_batch_check_worker, args=(loop, status_msg, cards, acc_lines[-1]), daemon=True, ).start() def _batch_check_worker(loop: asyncio.AbstractEventLoop, status_msg, cards: list, account_line: str): """批量 CC 检查工作线程""" from models import ClaudeAccount from identity import random_ua results = [] total = len(cards) try: acc_parts = account_line.split("|") email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2] account = ClaudeAccount(email, session_key, org_uuid, random_ua()) tokenizer = StripeTokenizer(account.user_agent) checker = GiftChecker(account) for i, card_line in enumerate(cards): cc, mm, yy, cvc = card_line.split("|") masked = f"{cc[:4]}****{cc[-4:]}" # 更新进度 recent = "\n".join(results[-5:]) # 显示最近 5 条结果 asyncio.run_coroutine_threadsafe( _edit_or_send( status_msg, f"🔍 [{i + 1}/{total}] 检查中:{masked}\n\n" + recent, ), loop, ).result(timeout=10) # Stripe Token pm_id = tokenizer.get_token(cc, mm, yy, cvc) if not pm_id: results.append(f"❌ {masked} → Stripe 拒绝") time.sleep(1) continue # Gift Purchase result = checker.purchase(pm_id) result_icons = { "LIVE": "💰", "DECLINED": "🚫", "INSUFFICIENT_FUNDS": "💸", "CCN_LIVE": "🔶", "DEAD": "💀", "ERROR": "⚠️", } icon = result_icons.get(result, "❓") results.append(f"{icon} {masked} → {result}") # 间隔防限流 if i < total - 1: time.sleep(2) # 最终汇报 live = sum(1 for r in results if "LIVE" in r and "CCN" not in r) dead = sum(1 for r in results if "DEAD" in r or "DECLINED" in r or "Stripe" in r) other = total - live - dead report = ( f"🏁 批量 CC 检查完成\n\n" f"📊 共 {total} 张 | 💰 有效 {live} | 💀 无效 {dead} | ❓ 其他 {other}\n\n" ) report += "\n".join(results) # Telegram 消息限制 4096 if len(report) > 4000: report = report[:4000] + "\n...(已截断)" asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, report), loop, ).result(timeout=10) except Exception as e: logger.exception("批量 CC 检查异常") asyncio.run_coroutine_threadsafe( _edit_or_send(status_msg, f"💥 批量 CC 检查异常:{e}"), loop, ) finally: _clear_task() # ============================================================ # 启动 Bot # ============================================================ async def post_init(application: Application): """Bot 启动后设置命令菜单""" commands = [ BotCommand("start", "欢迎信息"), BotCommand("register", "注册 Claude 账号 [数量]"), BotCommand("check", "CC 检查 <卡号|月|年|CVC>"), BotCommand("accounts", "查看已注册账号"), BotCommand("status", "当前任务状态"), BotCommand("help", "命令帮助"), ] await application.bot.set_my_commands(commands) logger.info("Bot 命令菜单已设置") def main(): """启动 Bot""" if TG_BOT_TOKEN == "your_bot_token_here": print("❌ 请先在 config.toml 中设置 TG_BOT_TOKEN!") return app = Application.builder().token(TG_BOT_TOKEN).post_init(post_init).build() # 注册命令 app.add_handler(CommandHandler("start", cmd_start)) app.add_handler(CommandHandler("help", cmd_help)) app.add_handler(CommandHandler("register", cmd_register)) app.add_handler(CommandHandler("check", cmd_check)) app.add_handler(CommandHandler("accounts", cmd_accounts)) app.add_handler(CommandHandler("status", cmd_status)) app.add_handler(MessageHandler(filters.Document.ALL, handle_document)) logger.info("🤖 Bot 启动中...") app.run_polling(allowed_updates=Update.ALL_TYPES) if __name__ == "__main__": main()