feat: Implement Telegram bot with Claude authentication, mail service, and identity spoofing, refactoring core logic into new modules. #1

Open
mygo-kyx wants to merge 13 commits from mygo-kyx/autoClaude-TGbot:main into Feature-FromFork-Kunkun
2 changed files with 191 additions and 1 deletions
Showing only changes of commit 768963666a - Show all commits

160
bot.py
View File

@@ -646,7 +646,7 @@ async def cmd_delete(update: Update, context: ContextTypes.DEFAULT_TYPE):
@restricted @restricted
async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE): async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""/verify — 检测已保存的 Session Key 是否仍然有效""" """/verify — 检测已保存的 Session Key 是否仍然有效,自动删除被封禁的账号"""
accounts = account_store.read_all() accounts = account_store.read_all()
if not accounts: if not accounts:
@@ -693,6 +693,12 @@ async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
valid = sum(1 for r in results if r["ok"]) valid = sum(1 for r in results if r["ok"])
invalid = len(results) - valid invalid = len(results) - valid
# 自动删除被封禁的账号
banned_emails = [r["email"] for r in results if r["reason"] == "被封禁"]
deleted_count = 0
if banned_emails:
deleted_count = account_store.delete_by_emails(banned_emails)
text = ( text = (
f"🔑 <b>账号验证结果</b>\n\n" f"🔑 <b>账号验证结果</b>\n\n"
f"✅ 有效: {valid} ❌ 无效: {invalid}\n\n" f"✅ 有效: {valid} ❌ 无效: {invalid}\n\n"
@@ -701,6 +707,10 @@ async def cmd_verify(update: Update, context: ContextTypes.DEFAULT_TYPE):
icon = "" if r["ok"] else "" icon = "" if r["ok"] else ""
text += f"{i}. {icon} <code>{r['email']}</code> — {r['reason']}\n" text += f"{i}. {icon} <code>{r['email']}</code> — {r['reason']}\n"
if deleted_count > 0:
text += f"\n🗑 <b>已自动删除 {deleted_count} 个封禁账号</b>\n"
text += f"📦 剩余 {account_store.count()} 个账号"
if len(text) > 4000: if len(text) > 4000:
text = text[:4000] + "\n...(已截断)" text = text[:4000] + "\n...(已截断)"
@@ -975,6 +985,112 @@ def _register_worker(loop: asyncio.AbstractEventLoop, status_msg, count: int):
_clear_task() _clear_task()
# ============================================================
# 自动注册辅助函数
# ============================================================
def _auto_register_sync(loop: asyncio.AbstractEventLoop, status_msg, count: int = 5) -> bool:
"""同步执行注册流程,返回是否至少注册了 1 个账号。
此函数在后台线程中被调用,用于 /check 和文件上传时自动注册账号。
"""
success_count = 0
try:
mail_pool = MailPool(MAIL_SYSTEMS)
if mail_pool.count == 0:
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, "❌ 没有可用的邮箱系统,无法自动注册!"),
loop,
).result(timeout=10)
return False
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"📭 没有可用账号,正在自动注册 {count} 个...\n"
f"📧 邮箱系统就绪({mail_pool.count} 个)",
),
loop,
).result(timeout=10)
for i in range(count):
if _is_stopped():
break
bar = _progress_bar(i, count)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"📭 自动注册中...\n{bar}\n✅ 成功: {success_count}",
),
loop,
).result(timeout=10)
# Step 1: 创建邮箱
random_prefix = ''.join(random.choices(string.ascii_lowercase + string.digits, k=10))
target_email, mail_sys = mail_pool.create_user(random_prefix)
if not target_email or not mail_sys:
account_store.record_register(False, "邮箱创建失败")
continue
if _is_stopped():
break
# Step 2: 发送 Magic Link
if not attack_claude(target_email):
account_store.record_register(False, "Magic Link 发送失败")
continue
# Step 3: 等待邮件
email_content = mail_sys.wait_for_email(target_email, stop_check=_is_stopped)
if _is_stopped():
break
if not email_content:
account_store.record_register(False, "邮件接收超时")
continue
magic_link = extract_magic_link(email_content)
if not magic_link:
account_store.record_register(False, "Magic Link 解析失败")
continue
if _is_stopped():
break
# Step 4: 交换 SessionKey
account = finalize_login(magic_link)
if account:
success_count += 1
account_store.append(account.email, account.session_key, account.org_uuid)
account_store.record_register(True)
else:
account_store.record_register(False, "SessionKey 交换失败")
# 间隔防止限流
if i < count - 1:
time.sleep(2)
asyncio.run_coroutine_threadsafe(
_edit_or_send(
status_msg,
f"{'⏹ 自动注册已中断' if _is_stopped() else '✅ 自动注册完成'}\n"
f"成功注册 {success_count} 个账号,继续执行检查...",
),
loop,
).result(timeout=10)
except Exception as e:
logger.exception("自动注册异常")
asyncio.run_coroutine_threadsafe(
_edit_or_send(status_msg, f"💥 自动注册异常:{e}"),
loop,
)
return False
return success_count > 0
# ============================================================ # ============================================================
# /check — CC 检查 # /check — CC 检查
# ============================================================ # ============================================================
@@ -1010,6 +1126,27 @@ async def cmd_check(update: Update, context: ContextTypes.DEFAULT_TYPE):
) )
return return
# 检查是否有账号,若无则自动注册 5 个
if account_store.count() == 0:
if not _set_task("自动注册 5 个账号"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
status_msg = await update.message.reply_text(
"📭 没有可用账号,正在自动注册 5 个...",
parse_mode="HTML",
)
loop = asyncio.get_event_loop()
reg_ok = await loop.run_in_executor(
None, _auto_register_sync, loop, status_msg, 5
)
_clear_task()
if not reg_ok:
await _edit_or_send(status_msg, "❌ 自动注册失败,无法执行 CC 检查。")
return
# 从账号池获取空闲账号 # 从账号池获取空闲账号
if len(cards) == 1: if len(cards) == 1:
acquired = account_store.acquire(1) acquired = account_store.acquire(1)
@@ -1141,6 +1278,27 @@ async def handle_document(update: Update, context: ContextTypes.DEFAULT_TYPE):
await update.message.reply_text("❌ 文件太大(最大 1MB") await update.message.reply_text("❌ 文件太大(最大 1MB")
return return
# 检查是否有账号,若无则自动注册 5 个
if account_store.count() == 0:
if not _set_task("自动注册 5 个账号"):
await update.message.reply_text(f"⚠️ 已有任务在运行:{_task_name}")
return
reg_status_msg = await update.message.reply_text(
"📭 没有可用账号,正在自动注册 5 个...",
parse_mode="HTML",
)
loop_reg = asyncio.get_event_loop()
reg_ok = await loop_reg.run_in_executor(
None, _auto_register_sync, loop_reg, reg_status_msg, 5
)
_clear_task()
if not reg_ok:
await _edit_or_send(reg_status_msg, "❌ 自动注册失败,无法执行 CC 检查。")
return
# 从账号池获取空闲账号 # 从账号池获取空闲账号
acquired = account_store.acquire() # 获取所有空闲 acquired = account_store.acquire() # 获取所有空闲

View File

@@ -180,6 +180,38 @@ def delete_by_email(email: str) -> dict | None:
return removed return removed
def delete_by_emails(emails: list[str]) -> int:
"""批量按邮箱删除账号,返回实际删除数量"""
if not emails:
return 0
email_set = set(emails)
with _lock:
try:
with open(_ACCOUNTS_FILE, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]
except FileNotFoundError:
return 0
remaining = []
deleted = 0
for line in lines:
parts = line.split("|")
if parts[0] in email_set:
deleted += 1
# 同时从 busy 集合中移除
_busy.discard(line)
else:
remaining.append(line)
if deleted > 0:
with open(_ACCOUNTS_FILE, "w", encoding="utf-8") as f:
for line in remaining:
f.write(line + "\n")
return deleted
# ====== 统计数据 ====== # ====== 统计数据 ======
def _load_stats() -> dict: def _load_stats() -> dict: