Files
autoClaude/bot.py

1446 lines
50 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
autoClaude Telegram Bot
基于 python-telegram-bot v21+ 的异步 Bot封装 Claude 注册和 CC 检查流程。
启动方式: uv run python bot.py
"""
import asyncio
import json
import logging
import os
import random
import string
import time
import threading
from functools import wraps
from telegram import Update, BotCommand, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import (
Application,
CallbackQueryHandler,
CommandHandler,
MessageHandler,
ContextTypes,
filters,
)
from config import (
TG_BOT_TOKEN,
TG_ALLOWED_USERS,
TG_USER_PERMISSIONS,
ADMIN_USERS,
get_merged_permissions,
MAIL_SYSTEMS,
)
from core.mail_service import MailPool, extract_magic_link
from core.stripe_token import StripeTokenizer
from core.gift_checker import GiftChecker
from core.claude_auth import attack_claude, finalize_login
from core import account_store
from core import proxy_pool
from core import permissions
# --- 日志配置 ---
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger(__name__)
# 降低 httpx 轮询日志级别,减少刷屏
logging.getLogger("httpx").setLevel(logging.WARNING)
# --- 全局状态 ---
_task_lock = threading.Lock()
_task_running = False
_task_name = ""
_stop_event = threading.Event()
# ============================================================
# 工具函数
# ============================================================
def restricted(func):
"""权限控制装饰器:检查用户是否有权使用当前命令。
权限检查规则:
1. 如果配置了 roles按角色的 commands 列表检查;"*" 表示全部权限
2. 如果没有配置 roles回退到 allowed_users 全量放行
3. 如果两者都没配,所有用户都可以使用
"""
# 从函数名提取命令名cmd_register -> register
cmd_name = func.__name__.removeprefix("cmd_").removeprefix("handle_")
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
if TG_USER_PERMISSIONS or True: # 总是检查合并权限
# 实时合并 config.toml + permissions.json
merged = get_merged_permissions()
if merged:
user_cmds = merged.get(user_id)
if user_cmds is None:
# 用户不在任何权限表中
if TG_ALLOWED_USERS or TG_USER_PERMISSIONS:
# 有限制配置但用户不在其中
await update.message.reply_text("⛔ 你没有权限使用此 Bot。")
return
# 没有任何限制配置:放行
elif "*" not in user_cmds and cmd_name not in user_cmds:
await update.message.reply_text(
f"⛔ 你没有权限使用 /{cmd_name} 命令。"
)
return
return await func(update, context, *args, **kwargs)
return wrapper
def _set_task(name: str) -> bool:
"""尝试设置任务锁,返回是否成功"""
global _task_running, _task_name
with _task_lock:
if _task_running:
return False
_task_running = True
_task_name = name
_stop_event.clear()
return True
def _clear_task():
"""释放任务锁"""
global _task_running, _task_name
with _task_lock:
_task_running = False
_task_name = ""
_stop_event.clear()
def _is_stopped() -> bool:
"""检查是否收到停止信号"""
return _stop_event.is_set()
async def _edit_or_send(msg, text: str):
"""安全地编辑消息,如果失败则发送新消息"""
try:
await msg.edit_text(text, parse_mode="HTML")
except Exception:
pass
def _progress_bar(current: int, total: int, width: int = 12) -> str:
"""生成可视化进度条 ▓▓▓▓░░░░ 3/10 (30%)"""
if total <= 0:
return ""
pct = current / total
filled = round(width * pct)
bar = "" * filled + "" * (width - filled)
return f"{bar} {current}/{total} ({round(pct * 100)}%)"
# ============================================================
# 命令处理
# ============================================================
async def cmd_start(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/start — 欢迎信息"""
welcome = (
"🤖 <b>autoClaude Bot</b>\n\n"
"可用命令:\n"
" /register [N] — 注册 Claude 账号\n"
" /check &lt;卡号|月|年|CVC&gt; — CC 检查\n"
" 📎 发送 .txt 文件 — 批量 CC 检查\n"
" /accounts — 查看账号 | /delete — 删除账号\n"
" /verify — 验证 SK 有效性\n"
" /stats — 统计面板\n"
" /stop — 中断当前任务\n"
" /proxy on|off — 代理开关\n"
" /proxytest — 测试代理 | /proxystatus — 代理状态\n"
" /mailstatus — 邮件系统状态\n"
" /status — 任务状态 | /help — 帮助\n\n"
f"👤 你的用户 ID: <code>{update.effective_user.id}</code>"
)
await update.message.reply_text(welcome, parse_mode="HTML")
async def cmd_help(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/help — 命令列表"""
text = (
"📖 <b>命令说明</b>\n\n"
"<b>📝 注册与账号</b>\n"
" /register [N] — 注册 N 个账号\n"
" /accounts — 查看已注册账号\n"
" /delete &lt;序号|邮箱&gt; — 删除账号\n"
" /verify — 验证 SK 有效性\n\n"
"<b>💳 CC 检查</b>\n"
" /check &lt;CARD|MM|YY|CVC&gt; — 单张检查\n"
" 📎 发送 .txt 文件 — 批量检查\n\n"
"<b>🛠 工具与状态</b>\n"
" /stop — 中断当前任务\n"
" /stats — 统计面板\n"
" /status — 任务状态\n\n"
"<b>🌐 代理与邮件</b>\n"
" /proxy on|off — 开关代理\n"
" /proxytest — 测试代理\n"
" /proxystatus — 代理池状态\n"
" /mailstatus — 邮件系统状态\n\n"
"<b>🔐 用户管理(仅管理员)</b>\n"
" /adduser &lt;ID&gt; [cmds] — 添加用户\n"
" /removeuser &lt;ID&gt; — 移除用户\n"
" /setperm &lt;ID&gt; &lt;cmds&gt; — 修改权限\n"
" /users — 查看用户列表\n"
)
await update.message.reply_text(text, parse_mode="HTML")
@restricted
async def cmd_status(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/status — 查看任务状态"""
lines = []
# 全局任务(注册等)
with _task_lock:
if _task_running:
lines.append(f"⏳ 当前任务:<b>{_task_name}</b>")
# 账号池状态
ps = account_store.pool_status()
if ps["total"] > 0:
lines.append(
f"🗄 账号池:共 {ps['total']} | "
f"🟢 空闲 {ps['free']} | 🔴 占用 {ps['busy']}"
)
if not lines:
await update.message.reply_text("✅ 当前无运行中的任务。")
else:
await update.message.reply_text("\n".join(lines), parse_mode="HTML")
@restricted
async def cmd_stop(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stop — 中断当前运行的任务"""
with _task_lock:
if not _task_running:
await update.message.reply_text("✅ 当前没有运行中的任务。")
return
_stop_event.set()
await update.message.reply_text(
f"⏹ 正在停止任务:<b>{_task_name}</b>\n"
"任务将在当前步骤完成后安全退出...",
parse_mode="HTML",
)
@restricted
async def cmd_mailstatus(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/mailstatus — 检查邮箱系统连通性"""
status_msg = await update.message.reply_text("🔍 正在检测邮件系统...")
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
await _edit_or_send(status_msg, "❌ 没有可用的邮箱系统!请检查 config.toml 配置。")
return
text = f"📬 <b>邮件系统状态(共 {mail_pool.count} 个)</b>\n\n"
for i, ms in enumerate(mail_pool.systems, 1):
health = ms.check_health()
icon = "" if health["ok"] else ""
domains = ", ".join(ms.domains)
text += f"{i}. {icon} <code>{ms.base_url}</code>\n"
text += f" 域名: {domains}\n"
text += f" 状态: {health['message']}\n"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_proxytest(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxytest — 测试所有代理的连通性"""
pp = proxy_pool.pool
if pp.count == 0:
await update.message.reply_text("❌ 代理池为空proxy.txt 不存在或无有效代理)")
return
total = pp.count
status_msg = await update.message.reply_text(
f"🔍 正在测试 {total} 个代理...\n"
f"{_progress_bar(0, total)}",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
# 在后台线程中运行测试,通过回调更新进度
def _run_test():
ok_count = 0
fail_count = 0
last_update_time = 0
def on_progress(current, total, result):
nonlocal ok_count, fail_count, last_update_time
if result["ok"]:
ok_count += 1
else:
fail_count += 1
# 限制更新频率:最少 2 秒更新一次,或者最后一个时强制更新
now = time.time()
is_last = (current == total)
if not is_last and (now - last_update_time) < 2:
return
last_update_time = now
icon = "" if result["ok"] else ""
latency = f"{result['latency_ms']}ms" if result.get('latency_ms', -1) > 0 else "-"
text = (
f"🔍 <b>代理测试中...</b>\n"
f"{_progress_bar(current, total)}\n\n"
f"✅ 通过: {ok_count} ❌ 失败: {fail_count}\n\n"
f"最新: {icon} <code>{result['proxy']}</code> {latency}"
)
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, text), loop
)
return pp.test_all(progress_callback=on_progress)
results = await loop.run_in_executor(None, _run_test)
# 构建最终结果报告
ok_count = sum(1 for r in results if r["ok"])
fail_count = len(results) - ok_count
text = (
f"🎯 <b>代理测试结果</b>\n\n"
f"✅ 通过: {ok_count} ❌ 失败: {fail_count} "
f"🚧 剩余可用: {pp.active_count}\n\n"
)
for r in results:
icon = "" if r["ok"] else ""
latency = f"{r['latency_ms']}ms" if r['latency_ms'] > 0 else "-"
prio = r.get('priority', '-')
text += f"{icon} <code>{r['proxy']}</code>\n"
text += f" 延迟: {latency} | 优先级: {prio}\n"
if not r["ok"]:
text += f" 错误: {r.get('error', '?')}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_proxystatus(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxystatus — 查看代理池状态"""
pp = proxy_pool.pool
if pp.count == 0:
await update.message.reply_text("❌ 代理池为空proxy.txt 不存在或无有效代理)")
return
items = pp.status_list()
text = f"🌐 <b>代理池状态(共 {len(items)} 个)</b>\n\n"
for i, item in enumerate(items, 1):
icon = "" if item["last_ok"] else ""
text += (
f"{i}. {icon} <code>{item['proxy']}</code>\n"
f" 优先级: {item['priority']} | "
f"延迟: {item['latency_ms']}ms | "
f"{item['success']}{item['fail']}\n"
)
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await update.message.reply_text(text, parse_mode="HTML")
# ============================================================
# 用户权限管理(仅管理员)
# ============================================================
from core import permissions
def _admin_only(func):
"""管理员专属装饰器:只有 config.toml 中 commands=["*"] 的用户可使用"""
@wraps(func)
async def wrapper(update: Update, context: ContextTypes.DEFAULT_TYPE, *args, **kwargs):
user_id = update.effective_user.id
if user_id not in ADMIN_USERS:
await update.message.reply_text("⛔ 此命令仅管理员可用。")
return
return await func(update, context, *args, **kwargs)
return wrapper
@_admin_only
async def cmd_adduser(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/adduser <user_id> [cmd1,cmd2,...|*] — 添加用户并设置权限"""
if not context.args or len(context.args) < 1:
await update.message.reply_text(
"❌ 用法:\n"
" <code>/adduser 123456789 *</code> — 全部权限\n"
" <code>/adduser 123456789 accounts,verify,stats</code> — 指定命令\n\n"
"💡 可用命令列表:\n"
f" <code>{', '.join(sorted(permissions.ALL_COMMANDS))}</code>",
parse_mode="HTML",
)
return
try:
target_uid = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 用户 ID 必须是数字。")
return
# 解析权限
if len(context.args) >= 2:
cmd_arg = context.args[1]
if cmd_arg == "*":
cmds = ["*"]
else:
cmds = [c.strip() for c in cmd_arg.split(",") if c.strip()]
# 验证命令名
invalid = [c for c in cmds if c not in permissions.ALL_COMMANDS]
if invalid:
await update.message.reply_text(
f"❌ 未知命令: {', '.join(invalid)}\n\n"
f"可用命令:\n<code>{', '.join(sorted(permissions.ALL_COMMANDS))}</code>",
parse_mode="HTML",
)
return
else:
# 默认给基础查看权限
cmds = ["start", "help", "accounts", "verify", "stats", "status"]
permissions.add_user(target_uid, cmds)
cmd_display = "全部命令" if cmds == ["*"] else ", ".join(cmds)
await update.message.reply_text(
f"✅ 已添加用户 <code>{target_uid}</code>\n"
f"🔑 权限: {cmd_display}",
parse_mode="HTML",
)
@_admin_only
async def cmd_removeuser(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/removeuser <user_id> — 移除用户"""
if not context.args:
await update.message.reply_text("❌ 用法: <code>/removeuser 123456789</code>", parse_mode="HTML")
return
try:
target_uid = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 用户 ID 必须是数字。")
return
# 不能移除 config.toml 中的管理员
if target_uid in ADMIN_USERS:
await update.message.reply_text("❌ 不能移除 config.toml 中定义的管理员。")
return
if permissions.remove_user(target_uid):
await update.message.reply_text(f"🗑 已移除用户 <code>{target_uid}</code>", parse_mode="HTML")
else:
await update.message.reply_text(f"❌ 用户 <code>{target_uid}</code> 不在运行时权限列表中。", parse_mode="HTML")
@_admin_only
async def cmd_setperm(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/setperm <user_id> <cmd1,cmd2,...|*> — 修改用户权限"""
if not context.args or len(context.args) < 2:
await update.message.reply_text(
"❌ 用法:\n"
" <code>/setperm 123456789 *</code>\n"
" <code>/setperm 123456789 accounts,verify,stats</code>\n\n"
f"可用命令:\n<code>{', '.join(sorted(permissions.ALL_COMMANDS))}</code>",
parse_mode="HTML",
)
return
try:
target_uid = int(context.args[0])
except ValueError:
await update.message.reply_text("❌ 用户 ID 必须是数字。")
return
# 不能修改 config.toml 管理员
if target_uid in ADMIN_USERS:
await update.message.reply_text("❌ 不能修改 config.toml 中定义的管理员权限。")
return
cmd_arg = context.args[1]
if cmd_arg == "*":
cmds = ["*"]
else:
cmds = [c.strip() for c in cmd_arg.split(",") if c.strip()]
invalid = [c for c in cmds if c not in permissions.ALL_COMMANDS]
if invalid:
await update.message.reply_text(
f"❌ 未知命令: {', '.join(invalid)}",
parse_mode="HTML",
)
return
# 如果用户不存在,自动添加
user = permissions.get_user(target_uid)
if user:
permissions.set_commands(target_uid, cmds)
else:
permissions.add_user(target_uid, cmds)
cmd_display = "全部命令" if cmds == ["*"] else ", ".join(cmds)
await update.message.reply_text(
f"✅ 用户 <code>{target_uid}</code> 权限已更新\n"
f"🔑 权限: {cmd_display}",
parse_mode="HTML",
)
@_admin_only
async def cmd_users(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/users — 查看所有用户及权限"""
merged = get_merged_permissions()
if not merged:
await update.message.reply_text("📭 没有任何用户。")
return
text = "👥 <b>用户权限列表</b>\n\n"
# config.toml 管理员
text += "<b>📌 管理员config.toml</b>\n"
for uid in sorted(ADMIN_USERS):
text += f" • <code>{uid}</code> — ✨ 全部权限\n"
# config.toml 非管理员角色
static_non_admin = {
uid: cmds for uid, cmds in TG_USER_PERMISSIONS.items()
if uid not in ADMIN_USERS
}
if static_non_admin:
text += "\n<b>📌 静态角色config.toml</b>\n"
for uid, cmds in sorted(static_non_admin.items()):
cmd_str = ", ".join(sorted(cmds)) if "*" not in cmds else "全部命令"
text += f" • <code>{uid}</code> — {cmd_str}\n"
# 运行时添加的用户
runtime_users = permissions.list_users()
if runtime_users:
text += "\n<b>🔧 运行时用户Bot 添加)</b>\n"
for uid, info in sorted(runtime_users.items()):
cmds = info.get("commands", [])
cmd_str = ", ".join(sorted(cmds)) if "*" not in cmds else "全部命令"
text += f" • <code>{uid}</code> — {cmd_str}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await update.message.reply_text(text, parse_mode="HTML")
@restricted
async def cmd_proxy(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/proxy on|off — 开关代理"""
pp = proxy_pool.pool
if not context.args:
# 无参数:显示当前状态
status = "✅ 已开启" if pp.enabled else "❌ 已关闭"
await update.message.reply_text(
f"🌐 <b>代理状态</b>: {status}\n"
f"📦 代理池: {pp.count} 个(活跃 {pp.active_count} 个)\n\n"
f"用法: <code>/proxy on</code> 或 <code>/proxy off</code>",
parse_mode="HTML",
)
return
arg = context.args[0].lower()
if arg == "on":
pp.enabled = True
await update.message.reply_text(
f"✅ 代理已开启(池中 {pp.active_count} 个可用)"
)
elif arg == "off":
pp.enabled = False
await update.message.reply_text("❌ 代理已关闭,所有请求将直连")
else:
await update.message.reply_text("❌ 用法: /proxy on 或 /proxy off")
@restricted
async def cmd_accounts(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/accounts — 列出已注册账号"""
accounts = account_store.read_all()
if not accounts:
await update.message.reply_text("📭 尚无已注册账号。")
return
text = f"📋 <b>已注册账号(共 {len(accounts)} 个)</b>\n\n"
for i, acc in enumerate(accounts, 1):
text += f"{i}. <code>{acc['email']}</code>\n SK: <code>{acc['session_key']}</code>\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断,请点击下方按钮导出完整数据)"
keyboard = [[InlineKeyboardButton("📥 导出 JSON 文件", callback_data="export_accounts_json")]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(text, parse_mode="HTML", reply_markup=reply_markup)
@restricted
async def cmd_delete(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/delete <序号|邮箱> — 删除指定账号"""
user_id = update.effective_user.id
if not context.args:
await update.message.reply_text(
"❌ 用法:\n"
" <code>/delete 3</code> — 按序号删除\n"
" <code>/delete user@example.com</code> — 按邮箱删除\n\n"
"💡 使用 /accounts 查看序号",
parse_mode="HTML",
)
return
arg = context.args[0]
removed = None
# 尝试按序号删除
try:
index = int(arg)
removed = account_store.delete_by_index(index)
if not removed:
total = account_store.count()
await update.message.reply_text(f"❌ 无效序号。当前共 {total} 个账号。")
return
except ValueError:
# 按邮箱删除
removed = account_store.delete_by_email(arg)
if not removed:
await update.message.reply_text(f"❌ 未找到邮箱:{arg}")
return
await update.message.reply_text(
f"🗑 已删除账号:\n"
f"📧 <code>{removed['email']}</code>\n"
f"剩余 {account_store.count()} 个账号",
parse_mode="HTML",
)
@restricted
async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/verify — 检测已保存的 Session Key 是否仍然有效"""
accounts = account_store.read_all()
if not accounts:
await update.message.reply_text("📭 尚无已注册账号。")
return
status_msg = await update.message.reply_text(
f"🔍 正在验证 {len(accounts)} 个账号的 Session Key..."
)
from curl_cffi import requests as cffi_requests
from config import get_proxy
results = []
for i, acc in enumerate(accounts, 1):
sk = acc.get("session_key", "")
email = acc.get("email", "?")
if not sk:
results.append({"email": email, "ok": False, "reason": "SK 为空"})
continue
try:
resp = cffi_requests.get(
"https://claude.ai/api/organizations",
headers={
"Cookie": f"sessionKey={sk}",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
},
impersonate="chrome124",
proxies=get_proxy(),
timeout=15,
)
if resp.status_code == 200:
results.append({"email": email, "ok": True, "reason": "有效"})
elif resp.status_code == 401:
results.append({"email": email, "ok": False, "reason": "已过期"})
elif resp.status_code == 403:
results.append({"email": email, "ok": False, "reason": "被封禁"})
else:
results.append({"email": email, "ok": False, "reason": f"HTTP {resp.status_code}"})
except Exception as e:
results.append({"email": email, "ok": False, "reason": str(e)[:50]})
valid = sum(1 for r in results if r["ok"])
invalid = len(results) - valid
text = (
f"🔑 <b>账号验证结果</b>\n\n"
f"✅ 有效: {valid} ❌ 无效: {invalid}\n\n"
)
for i, r in enumerate(results, 1):
icon = "" if r["ok"] else ""
text += f"{i}. {icon} <code>{r['email']}</code> — {r['reason']}\n"
if len(text) > 4000:
text = text[:4000] + "\n...(已截断)"
await _edit_or_send(status_msg, text)
@restricted
async def cmd_stats(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/stats — 展示历史统计数据"""
stats = account_store.get_stats()
reg_total = stats.get("register_total", 0)
reg_ok = stats.get("register_success", 0)
reg_fail = stats.get("register_fail", 0)
reg_rate = f"{round(reg_ok / reg_total * 100)}%" if reg_total > 0 else "-"
cc_total = stats.get("cc_total", 0)
cc_ok = stats.get("cc_pass", 0)
cc_fail = stats.get("cc_fail", 0)
cc_rate = f"{round(cc_ok / cc_total * 100)}%" if cc_total > 0 else "-"
text = (
"📊 <b>统计面板</b>\n\n"
f"<b>📝 注册统计</b>\n"
f" 总计: {reg_total} | ✅ {reg_ok} | ❌ {reg_fail}\n"
f" 成功率: {reg_rate}\n"
)
# 失败原因分布
reasons = stats.get("register_fail_reasons", {})
if reasons:
text += " <b>失败原因:</b>\n"
for reason, cnt in sorted(reasons.items(), key=lambda x: -x[1]):
text += f"{reason}: {cnt}\n"
text += (
f"\n<b>💳 CC 检查统计</b>\n"
f" 总计: {cc_total} | ✅ {cc_ok} | ❌ {cc_fail}\n"
f" 通过率: {cc_rate}\n"
)
text += f"\n<b>📦 当前账号数</b>: {account_store.count()}"
await update.message.reply_text(text, parse_mode="HTML")
# ============================================================
# /register — 注册 Claude 账号
# ============================================================
@restricted
async def cmd_register(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/register [N] — 注册 Claude 账号"""
# 解析数量
count = 1
if context.args:
try:
count = int(context.args[0])
if count < 1 or count > 20:
await update.message.reply_text("❌ 数量范围1-20")
return
except ValueError:
await update.message.reply_text("❌ 用法:/register [数量]")
return
if not _set_task(f"注册 {count} 个账号"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
f"🚀 开始注册 {count} 个 Claude 账号...",
parse_mode="HTML",
)
# 在后台线程执行耗时操作
loop = asyncio.get_event_loop()
threading.Thread(
target=_register_worker,
args=(loop, status_msg, count),
daemon=True,
).start()
def _register_worker(loop: asyncio.AbstractEventLoop, status_msg, count: int):
"""注册工作线程"""
results = {"success": 0, "fail": 0, "accounts": [], "fail_reasons": []}
try:
# 初始化邮箱系统池
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "📧 正在连接邮件系统..."),
loop,
).result(timeout=10)
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "❌ 没有可用的邮箱系统!"),
loop,
).result(timeout=10)
return
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"📧 邮箱系统就绪({mail_pool.count} 个)\n🚀 开始注册..."),
loop,
).result(timeout=10)
for i in range(count):
bar = _progress_bar(i, count)
step_header = f"📊 {bar}\n"
# Step 1: 创建邮箱(轮询选系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 创建临时邮箱...\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
# 检查停止信号
if _is_stopped():
break
random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
target_email, mail_sys = mail_pool.create_user(random_prefix)
if not target_email or not mail_sys:
results["fail"] += 1
reason = "邮箱创建失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# 检查停止信号
if _is_stopped():
break
# Step 2: 发送 Magic Link
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 发送 Magic Link...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
if _is_stopped():
break
if not attack_claude(target_email):
results["fail"] += 1
reason = "Magic Link 发送失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# Step 3: 等待邮件(使用创建时对应的系统)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 等待 Claude 邮件...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
email_content = mail_sys.wait_for_email(target_email, stop_check=_is_stopped)
if _is_stopped():
break
if not email_content:
results["fail"] += 1
reason = "邮件接收超时"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
magic_link = extract_magic_link(email_content)
if not magic_link:
results["fail"] += 1
reason = "Magic Link 解析失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
continue
# Step 4: 交换 SessionKey
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{step_header}⏳ 交换 SessionKey...\n"
f"📧 {target_email}\n\n"
f"✅ 成功: {results['success']} ❌ 失败: {results['fail']}",
),
loop,
).result(timeout=10)
if _is_stopped():
break
account = finalize_login(magic_link)
if account:
results["success"] += 1
results["accounts"].append(account)
account_store.append(account.email, account.session_key, account.org_uuid)
account_store.record_register(True)
else:
results["fail"] += 1
reason = "SessionKey 交换失败"
results["fail_reasons"].append(reason)
account_store.record_register(False, reason)
# 间隔防止限流
if i < count - 1:
time.sleep(2)
# 最终汇报
stopped = _is_stopped()
done_bar = _progress_bar(results["success"] + results["fail"], count)
title = "⏹ <b>注册已中断</b>" if stopped else "🏁 <b>注册完成</b>"
report = (
f"{title}\n"
f"📊 {done_bar}\n\n"
f"✅ 成功: {results['success']}\n"
f"❌ 失败: {results['fail']}\n"
)
# 失败原因汇总
if results["fail_reasons"]:
from collections import Counter
reason_counts = Counter(results["fail_reasons"])
report += "\n<b>失败原因:</b>\n"
for reason, cnt in reason_counts.most_common():
report += f"{reason}: {cnt}\n"
if results["accounts"]:
report += "\n<b>新注册账号:</b>\n"
for acc in results["accounts"]:
report += (
f"• <code>{acc.email}</code>\n"
f" SK: <code>{acc.session_key}</code>\n"
)
# Telegram 消息限制 4096 字符
if len(report) > 4000:
report = report[:4000] + "\n...(已截断,使用 /accounts 查看完整列表)"
keyboard = [[InlineKeyboardButton("📥 导出 JSON 文件", callback_data="export_accounts_json")]]
reply_markup = InlineKeyboardMarkup(keyboard)
async def _send_final():
try:
await status_msg.edit_text(report, parse_mode="HTML", reply_markup=reply_markup)
except Exception:
pass
asyncio.run_coroutine_threadsafe(
_send_final(),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("注册任务异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 注册任务异常:{e}"),
loop,
)
finally:
_clear_task()
# ============================================================
# /check — CC 检查
# ============================================================
@restricted
async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/check <CARD|MM|YY|CVC> — CC 检查(支持多行批量)"""
if not context.args:
await update.message.reply_text(
"❌ 用法:/check <code>卡号|月|年|CVC</code>\n"
"支持一次粘贴多行:\n"
"<code>/check\n"
"4111111111111111|12|25|123\n"
"5200000000000007|01|26|456</code>",
parse_mode="HTML",
)
return
# 解析所有行,提取有效卡片
cards = []
for arg in context.args:
arg = arg.strip()
if not arg:
continue
parts = arg.split("|")
if len(parts) == 4:
cards.append(arg)
if not cards:
await update.message.reply_text(
"❌ 没有找到有效卡片。\n格式:<code>卡号|月|年|CVC</code>",
parse_mode="HTML",
)
return
# 从账号池获取空闲账号
if len(cards) == 1:
acquired = account_store.acquire(1)
else:
acquired = account_store.acquire() # 尽量获取所有空闲
if not acquired:
ps = account_store.pool_status()
if ps["total"] == 0:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
else:
await update.message.reply_text(
f"⏳ 所有账号繁忙({ps['busy']}/{ps['total']}),请稍后再试。"
)
return
if len(cards) == 1:
# 单卡检查
parts = cards[0].split("|")
status_msg = await update.message.reply_text(
f"🔍 正在检查卡片:<code>{parts[0][:4]}****{parts[0][-4:]}</code>",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
threading.Thread(
target=_check_worker,
args=(loop, status_msg, cards[0], acquired[0]),
daemon=True,
).start()
else:
# 多卡批量检查
status_msg = await update.message.reply_text(
f"📋 解析到 <b>{len(cards)}</b> 张卡片,{len(acquired)} 个账号就绪,开始批量检查...",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
threading.Thread(
target=_batch_check_worker,
args=(loop, status_msg, cards, acquired),
daemon=True,
).start()
def _check_worker(loop: asyncio.AbstractEventLoop, status_msg, card_line: str, account_line: str):
"""CC 检查工作线程(完成后自动释放账号)"""
try:
cc, mm, yy, cvc = card_line.split("|")
acc_parts = account_line.split("|")
email, session_key, org_uuid = acc_parts[0], acc_parts[1], acc_parts[2]
from core.models import ClaudeAccount
from core.identity import random_ua
account = ClaudeAccount(email, session_key, org_uuid, random_ua())
masked = f"{cc[:4]}****{cc[-4:]}"
# Step 1: Stripe Token
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n⏳ 获取 Stripe Token..."),
loop,
).result(timeout=10)
tokenizer = StripeTokenizer(account.user_agent)
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n❌ Stripe 拒绝,无法获取 Token"),
loop,
).result(timeout=10)
return
# Step 2: Gift Purchase
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"🔍 <code>{masked}</code>\n⏳ 尝试扣款验证..."),
loop,
).result(timeout=10)
checker = GiftChecker(account)
result = checker.purchase(pm_id)
# 结果映射
result_map = {
"LIVE": "💰 LIVE — 扣款成功!卡有效",
"DECLINED": "🚫 DECLINED — 被拒绝",
"INSUFFICIENT_FUNDS": "💸 INSUFFICIENT — 余额不足(卡有效)",
"CCN_LIVE": "🔶 CCN LIVE — 卡号有效但 CVC 错误",
"DEAD": "💀 DEAD — 无效卡",
"ERROR": "⚠️ ERROR — 检查出错",
}
result_text = result_map.get(result, f"❓ 未知结果:{result}")
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 <b>CC 检查结果</b>\n\n"
f"卡片:<code>{masked}</code>\n"
f"结果:{result_text}",
),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 CC 检查异常:{e}"),
loop,
)
finally:
account_store.release([account_line])
# ============================================================
# 文件上传 — 批量 CC 检查
# ============================================================
@restricted
async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""接收 .txt 文件进行批量 CC 检查"""
doc = update.message.document
# 检查文件类型
if not doc.file_name.endswith(".txt"):
await update.message.reply_text("❌ 仅支持 .txt 文件")
return
# 检查文件大小(限制 1MB
if doc.file_size > 1024 * 1024:
await update.message.reply_text("❌ 文件太大(最大 1MB")
return
# 从账号池获取空闲账号
acquired = account_store.acquire() # 获取所有空闲
if not acquired:
ps = account_store.pool_status()
if ps["total"] == 0:
await update.message.reply_text("❌ 没有可用账号,请先 /register 注册一个。")
else:
await update.message.reply_text(
f"⏳ 所有账号繁忙({ps['busy']}/{ps['total']}),请稍后再试。"
)
return
# 下载文件
status_msg = await update.message.reply_text("📥 正在下载文件...")
try:
file = await doc.get_file()
file_bytes = await file.download_as_bytearray()
content = file_bytes.decode("utf-8", errors="ignore")
except Exception as e:
await _edit_or_send(status_msg, f"💥 下载文件失败:{e}")
account_store.release(acquired)
return
# 解析卡片
cards = []
for line in content.splitlines():
line = line.strip()
if line and not line.startswith("#"):
parts = line.split("|")
if len(parts) == 4:
cards.append(line)
if not cards:
await _edit_or_send(status_msg, "❌ 文件中没有找到有效卡片。\n格式:<code>卡号|月|年|CVC</code>")
account_store.release(acquired)
return
await _edit_or_send(
status_msg,
f"📋 读取到 <b>{len(cards)}</b> 张卡片,开始批量检查...",
)
# 后台线程执行
loop = asyncio.get_event_loop()
threading.Thread(
target=_batch_check_worker,
args=(loop, status_msg, cards, acquired),
daemon=True,
).start()
def _batch_check_worker(loop: asyncio.AbstractEventLoop, status_msg, cards: list, acc_lines: list):
"""批量 CC 检查工作线程round-robin 轮询账号)"""
from core.models import ClaudeAccount
from core.identity import random_ua
results = []
total = len(cards)
num_accounts = len(acc_lines)
# 预构建所有账号对象
accounts = []
for line in acc_lines:
parts = line.split("|")
email, session_key, org_uuid = parts[0], parts[1], parts[2]
acc = ClaudeAccount(email, session_key, org_uuid, random_ua())
accounts.append(acc)
# 为每个账号创建对应的 tokenizer 和 checker
tokenizers = [StripeTokenizer(acc.user_agent) for acc in accounts]
checkers = [GiftChecker(acc) for acc in accounts]
try:
for i, card_line in enumerate(cards):
if _is_stopped():
break
# Round-robin 选择账号
idx = i % num_accounts
current_acc = accounts[idx]
tokenizer = tokenizers[idx]
checker = checkers[idx]
acc_label = current_acc.email.split("@")[0] # 简短标识
cc, mm, yy, cvc = card_line.split("|")
masked = f"{cc[:4]}****{cc[-4:]}"
# 更新进度
recent = "\n".join(results[-5:]) # 显示最近 5 条结果
acc_info = f" 👤 账号 {idx + 1}/{num_accounts}" if num_accounts > 1 else ""
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"🔍 [{i + 1}/{total}] 检查中:<code>{masked}</code>{acc_info}\n\n"
+ recent,
),
loop,
).result(timeout=10)
# Stripe Token
pm_id = tokenizer.get_token(cc, mm, yy, cvc)
if not pm_id:
results.append(f"❌ <code>{masked}</code> → Stripe 拒绝")
time.sleep(1)
continue
# Gift Purchase
result = checker.purchase(pm_id)
result_icons = {
"LIVE": "💰", "DECLINED": "🚫",
"INSUFFICIENT_FUNDS": "💸", "CCN_LIVE": "🔶",
"DEAD": "💀", "ERROR": "⚠️",
}
icon = result_icons.get(result, "")
# 仅成功结果显示完整卡号 + 对应邮箱
live_results = {"LIVE", "INSUFFICIENT_FUNDS", "CCN_LIVE"}
if result in live_results:
results.append(f"{icon} <code>{card_line}</code> → {result}\n 📧 {current_acc.email}")
else:
results.append(f"{icon} <code>{masked}</code> → {result}")
# 间隔防限流
if i < total - 1:
time.sleep(2)
# 最终汇报
live = sum(1 for r in results if "LIVE" in r and "CCN" not in r)
dead = sum(1 for r in results if "DEAD" in r or "DECLINED" in r or "Stripe" in r)
other = total - live - dead
acc_note = f" | 👤 {num_accounts} 个账号轮询" if num_accounts > 1 else ""
report = (
f"🏁 <b>批量 CC 检查完成</b>\n\n"
f"📊 共 {total} 张 | 💰 有效 {live} | 💀 无效 {dead} | ❓ 其他 {other}{acc_note}\n\n"
)
report += "\n".join(results)
# Telegram 消息限制 4096
if len(report) > 4000:
report = report[:4000] + "\n...(已截断)"
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, report),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("批量 CC 检查异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 批量 CC 检查异常:{e}"),
loop,
)
finally:
account_store.release(acc_lines)
# ============================================================
# 回调处理 — 导出 JSON
# ============================================================
async def callback_export_json(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""处理导出 JSON 文件的回调"""
query = update.callback_query
await query.answer()
accounts = account_store.read_all()
if not accounts:
await query.message.reply_text("📭 没有账号数据。")
return
json_data = json.dumps(accounts, indent=2, ensure_ascii=False)
json_path = "accounts_export.json"
with open(json_path, "w", encoding="utf-8") as f:
f.write(json_data)
with open(json_path, "rb") as f:
await query.message.reply_document(
document=f,
filename="accounts.json",
caption=f"📋 共 {len(accounts)} 个账号",
)
# 清理临时文件
try:
os.remove(json_path)
except OSError:
pass
# ============================================================
# 启动 Bot
# ============================================================
async def post_init(application: Application):
"""Bot 启动后设置命令菜单 + 推送 help"""
commands = [
BotCommand("start", "欢迎信息"),
BotCommand("register", "注册 Claude 账号 [数量]"),
BotCommand("stop", "中断当前任务"),
BotCommand("check", "CC 检查"),
BotCommand("accounts", "查看账号"),
BotCommand("delete", "删除账号"),
BotCommand("verify", "验证 SK 有效性"),
BotCommand("stats", "统计面板"),
BotCommand("mailstatus", "邮件系统状态"),
BotCommand("proxy", "代理开关 on/off"),
BotCommand("proxytest", "测试代理"),
BotCommand("proxystatus", "代理池状态"),
BotCommand("adduser", "添加用户"),
BotCommand("removeuser", "移除用户"),
BotCommand("setperm", "修改用户权限"),
BotCommand("users", "用户列表"),
BotCommand("status", "任务状态"),
BotCommand("help", "命令帮助"),
]
await application.bot.set_my_commands(commands)
logger.info("Bot 命令菜单已设置")
# 向所有已知用户推送启动通知 + help
merged = get_merged_permissions()
user_ids = set(merged.keys()) | set(TG_ALLOWED_USERS)
if not user_ids:
logger.info("未配置用户,跳过启动通知")
return
startup_msg = (
"🤖 <b>autoClaude Bot 已启动</b>\n\n"
"📖 <b>命令说明</b>\n\n"
"<b>📝 注册与账号</b>\n"
" /register [N] — 注册 N 个账号\n"
" /accounts — 查看已注册账号\n"
" /delete &lt;序号|邮箱&gt; — 删除账号\n"
" /verify — 验证 SK 有效性\n\n"
"<b>💳 CC 检查</b>\n"
" /check &lt;CARD|MM|YY|CVC&gt; — 单张检查\n"
" 📎 发送 .txt 文件 — 批量检查\n\n"
"<b>🛠 工具与状态</b>\n"
" /stop — 中断当前任务\n"
" /stats — 统计面板\n"
" /status — 任务状态\n\n"
"<b>🌐 代理与邮件</b>\n"
" /proxy on|off — 开关代理\n"
" /proxytest — 测试代理\n"
" /proxystatus — 代理池状态\n"
" /mailstatus — 邮件系统状态\n\n"
"<b>🔐 用户管理(仅管理员)</b>\n"
" /adduser &lt;ID&gt; [cmds] — 添加用户\n"
" /removeuser &lt;ID&gt; — 移除用户\n"
" /setperm &lt;ID&gt; &lt;cmds&gt; — 修改权限\n"
" /users — 查看用户列表\n"
)
for uid in user_ids:
try:
await application.bot.send_message(
chat_id=uid, text=startup_msg, parse_mode="HTML",
)
except Exception as e:
logger.debug(f"启动通知发送失败 (uid={uid}): {e}")
logger.info(f"启动通知已推送给 {len(user_ids)} 个用户")
def main():
"""启动 Bot"""
if TG_BOT_TOKEN == "your_bot_token_here":
print("❌ 请先在 config.toml 中设置 TG_BOT_TOKEN")
return
app = Application.builder().token(TG_BOT_TOKEN).post_init(post_init).build()
# 注册命令
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("register", cmd_register))
app.add_handler(CommandHandler("stop", cmd_stop))
app.add_handler(CommandHandler("check", cmd_check))
app.add_handler(CommandHandler("accounts", cmd_accounts))
app.add_handler(CommandHandler("delete", cmd_delete))
app.add_handler(CommandHandler("verify", cmd_verify))
app.add_handler(CommandHandler("stats", cmd_stats))
app.add_handler(CommandHandler("mailstatus", cmd_mailstatus))
app.add_handler(CommandHandler("proxytest", cmd_proxytest))
app.add_handler(CommandHandler("proxystatus", cmd_proxystatus))
app.add_handler(CommandHandler("proxy", cmd_proxy))
app.add_handler(CommandHandler("status", cmd_status))
app.add_handler(CommandHandler("adduser", cmd_adduser))
app.add_handler(CommandHandler("removeuser", cmd_removeuser))
app.add_handler(CommandHandler("setperm", cmd_setperm))
app.add_handler(CommandHandler("users", cmd_users))
app.add_handler(CallbackQueryHandler(callback_export_json, pattern="^export_accounts_json$"))
app.add_handler(MessageHandler(filters.Document.ALL, handle_document))
logger.info("🤖 Bot 启动中...")
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()