集成notion;集成拿token;

This commit is contained in:
dela
2026-01-13 10:56:44 +08:00
parent d31ac8628f
commit 2ba443d314
18 changed files with 1624 additions and 991 deletions

View File

@@ -1,419 +0,0 @@
# OpenAI 自动注册 + 支付集成指南
## 🎯 功能概述
现在 `auto_register.py` 已经完全集成了支付功能,可以一键完成:
1.**注册 OpenAI 账号**(自动生成临时邮箱 + 邮箱验证)
2.**生成欧洲账单 URL**EU Billing
3.**自动添加支付方式**SEPA自动生成德国IBAN
---
## 📋 核心模块
### 1. **IBAN 生成器** (`modules/iban_generator.py`)
- 自动生成符合 ISO 7064 Mod 97-10 标准的德国IBAN
- 使用真实的德国银行代码BLZ
- 每个账号生成唯一的IBAN
### 2. **Stripe 支付处理器** (`modules/stripe_payment.py`)
- 使用 `curl_cffi` 模拟真实浏览器
- 支持 SEPA 支付方式
- 完整的支付流程:创建支付方式 → 确认支付 → 轮询状态
### 3. **注册器集成** (`modules/register.py`)
- 新增 `add_payment_method()` 方法
- 共享 HTTPClient 实例,保持 session 一致性
---
## 🚀 使用方法
### **基础注册(不含支付)**
```bash
# 注册1个账号
python auto_register.py
# 注册10个账号
python auto_register.py --count 10
# 指定密码(所有账号使用相同密码)
python auto_register.py -c 5 -p "MyPassword123!"
# 保存到指定文件
python auto_register.py -c 10 -o my_accounts.json
```
---
### **注册 + 生成账单 URL**
```bash
# 注册并生成欧洲账单URL
python auto_register.py --eu-billing
# 批量注册10个账号并生成账单
python auto_register.py -c 10 --eu-billing
```
---
### **注册 + 账单 + 自动添加支付(推荐)**
```bash
# 🔥 完整流程:注册 + 账单 + 支付
python auto_register.py --eu-billing --add-payment
# 🔥 批量10个账号完整流程
python auto_register.py -c 10 --eu-billing --add-payment
# 🔥 自定义支付信息
python auto_register.py \
--eu-billing \
--add-payment \
--name "Hans Mueller" \
--address "Hauptstraße 123" \
--city "Berlin" \
--postal-code "10115" \
--state "BE" \
--country "DE"
```
---
## 📝 命令行参数详解
### **基础参数**
| 参数 | 说明 | 默认值 |
|------|------|--------|
| `-c, --count` | 注册账号数量 | `1` |
| `-p, --password` | 指定密码(所有账号相同) | 自动生成随机密码 |
| `-o, --output` | 保存账号的JSON文件路径 | `registered_accounts.json` |
### **功能开关**
| 参数 | 说明 |
|------|------|
| `--eu-billing` | 启用欧洲账单生成 |
| `--add-payment` | 启用自动添加支付方式(需要 `--eu-billing` |
### **支付信息参数**(仅在 `--add-payment` 时生效)
| 参数 | 说明 | 默认值 |
|------|------|--------|
| `--name` | 持卡人姓名 | `John Doe` |
| `--address` | 街道地址 | `123 Main Street` |
| `--city` | 城市 | `New York` |
| `--postal-code` | 邮编 | `10001` |
| `--state` | 州/省 | `NY` |
| `--country` | 国家代码 | `US` |
**注意:** IBAN 会自动生成每个账号使用不同的德国IBAN。
---
## 💡 实际使用示例
### **示例 1: 快速注册 1 个账号(含支付)**
```bash
python auto_register.py --eu-billing --add-payment
```
**输出:**
```
============================================================
Starting batch registration: 1 accounts
EU Billing: Enabled
Payment: Enabled (auto-generating IBANs)
============================================================
✅ IBAN Generator initialized
────────────────────────────────────────────────────────────
[1/1] Registering account #1...
────────────────────────────────────────────────────────────
🔢 Generated IBAN: DE22700201007412345678
✅ [1.1] Visited ChatGPT (200)
✅ [1.2] CSRF token extracted
✅ [1.4] Got OAuth URL
...
✅ [3] Registration successful!
✅ [4.3] Got verification code: 123456
✅ [4.4] Email verified successfully!
✅ [5] Access token retrieved
✅ EU billing URL generated
URL: https://pay.openai.com/c/pay/cs_live_xxx...
🔐 Adding payment method...
INFO:modules.stripe_payment:Creating payment method with IBAN: DE227002****5678
INFO:modules.stripe_payment:✅ Payment method created: pm_xxx
INFO:modules.stripe_payment:Confirming payment with method: pm_xxx
INFO:modules.stripe_payment:✅ Payment confirmation response state: succeeded
✅ Payment method added successfully
✅ Account #1 registered successfully!
Email: abc123@temp.mail
Password: Xy9$zK2@pQ4!mN8&
Checkout URL: https://pay.openai.com/c/pay/cs_live_xxx...
Payment: ✅ Added
IBAN: DE22700201007412345678
```
---
### **示例 2: 批量注册 10 个账号(含支付)**
```bash
python auto_register.py -c 10 --eu-billing --add-payment -o team_accounts.json
```
**结果:**
- 注册 10 个 OpenAI 账号
- 每个账号都有欧洲账单 URL
- 每个账号都已添加支付方式不同的IBAN
- 保存到 `team_accounts.json`
**JSON 输出格式:**
```json
[
{
"email": "abc123@temp.mail",
"password": "Xy9$zK2@pQ4!mN8&",
"verified": true,
"checkout_url": "https://pay.openai.com/c/pay/cs_live_xxx...",
"payment_added": true,
"iban": "DE22700201007412345678"
},
{
"email": "def456@temp.mail",
"password": "Zw3@hJ5!tL9$xM2&",
"verified": true,
"checkout_url": "https://pay.openai.com/c/pay/cs_live_yyy...",
"payment_added": true,
"iban": "DE84601202002216834329"
}
...
]
```
---
### **示例 3: 使用德国地址信息**
```bash
python auto_register.py \
-c 5 \
--eu-billing \
--add-payment \
--name "Hans Mueller" \
--address "Hauptstraße 123" \
--city "Berlin" \
--postal-code "10115" \
--country "DE"
```
---
## 🔧 技术细节
### **IBAN 生成算法**
```python
# 生成流程(自动执行):
1. 随机选择德国银行代码BLZ 60120200
2. 生成 10 位随机账号 2216834329
3. 拼接 BBAN60120200 + 2216834329
4. 计算 ISO 7064 Mod 97-10 校验位
5. 最终 IBANDE84601202002216834329
```
### **支付流程3步**
```
Step 1: 创建支付方式
POST https://api.stripe.com/v1/payment_methods
→ 返回 payment_method_id (pm_xxx)
Step 2: 确认支付
POST https://api.stripe.com/v1/payment_pages/{session_id}/confirm
→ 状态: processing_subscription / succeeded
Step 3: 轮询状态
GET https://api.stripe.com/v1/payment_pages/{session_id}/poll
→ 最终状态: succeeded ✅
```
### **关键特性**
**使用 curl_cffi**:完美模拟 Chrome 浏览器,绕过 Cloudflare
**共享 Session**:注册和支付使用同一个 HTTPClient保持 cookies 一致
**自动生成 IBAN**每个账号使用不同的德国IBAN避免重复
**完整日志**:详细的调试信息,方便排查问题
---
## ⚠️ 注意事项
### **1. 临时邮箱配置**
确保 `config.py` 中配置了临时邮箱:
```python
TEMPMAIL_CONFIG = {
'api_base_url': 'https://your.tempmail.api',
'username': 'your_username',
'password': 'your_password',
# 或者使用 admin_token
'admin_token': 'your_jwt_token',
'domain_index': 0
}
```
### **2. 支付金额**
- 当前支付金额为 `$0``expected_amount=0`
- 这是免费试用或初始订阅
- 真实扣款会在后续 billing cycle 发生
### **3. 反爬虫机制**
Stripe 有以下反爬虫措施:
- **指纹追踪**guid/muid/sid已自动生成
- **hCaptcha**:大规模自动化可能触发人机验证
- **Token时效**client_secret 只能用一次
- **IP限制**:频繁请求可能被 ban建议使用代理
### **4. 测试建议**
```bash
# 先测试1个账号
python auto_register.py --eu-billing --add-payment
# 确认成功后再批量
python auto_register.py -c 10 --eu-billing --add-payment
```
---
## 📊 成功率优化
### **提高成功率的方法**
1. **使用代理**(避免 IP 被 ban
2. **降低并发**(不要一次注册太多)
3. **间隔时间**(每个账号之间等待几秒)
4. **监控日志**`DEBUG=True` 查看详细信息)
### **失败排查**
如果支付失败,检查:
```bash
# 查看详细日志
export DEBUG=True
python auto_register.py --eu-billing --add-payment
# 常见错误:
# 1. "Invalid IBAN" → IBAN生成器问题已修复
# 2. "Payment method creation failed" → Stripe API 限制
# 3. "Timeout" → 网络问题或 Stripe 服务慢
```
---
## 🎉 完整示例命令
```bash
# 🔥 推荐批量注册10个完整账号
python auto_register.py \
--count 10 \
--eu-billing \
--add-payment \
--output production_accounts.json
# 🔥 使用德国信息
python auto_register.py \
-c 5 \
--eu-billing \
--add-payment \
--name "Hans Mueller" \
--address "Hauptstraße 123" \
--city "Berlin" \
--postal-code "10115" \
--country "DE" \
-o german_accounts.json
# 🔥 单个账号快速测试
python auto_register.py --eu-billing --add-payment
```
---
## 📚 相关文件
```
autoreg/
├── auto_register.py # 主脚本(已集成支付)
├── modules/
│ ├── register.py # 注册器(含 add_payment_method
│ ├── stripe_payment.py # Stripe 支付处理器
│ ├── iban_generator.py # IBAN 生成器
│ ├── http_client.py # HTTP 客户端curl_cffi
│ └── billing.py # 欧洲账单生成器
├── config.py # 配置文件
└── PAYMENT_GUIDE.md # 本文档
```
---
## 🛠️ 故障排除
### **问题 1: ModuleNotFoundError: No module named 'curl_cffi'**
```bash
pip install curl_cffi
```
### **问题 2: IBAN 生成器导入失败**
```bash
# 确保文件存在
ls modules/iban_generator.py
# 如果不存在,从 reference 复制
cp reference/iban.py modules/iban_generator.py
```
### **问题 3: 支付一直失败**
```bash
# 1. 检查 Stripe 公钥是否正确
# 2. 确认账单 URL 格式正确cs_live_xxx
# 3. 查看详细日志
DEBUG=True python auto_register.py --eu-billing --add-payment
```
---
## ✅ 总结
现在你的 `auto_register.py` 已经是一个**完整的 OpenAI 账号注册 + 支付自动化工具**
**一行命令搞定所有事情:**
```bash
python auto_register.py -c 10 --eu-billing --add-payment
```
✅ 注册 10 个账号
✅ 每个账号都有欧洲账单 URL
✅ 每个账号都已添加支付方式不同的德国IBAN
✅ 保存到 JSON 文件
**Enjoy! 🚀**

View File

@@ -16,7 +16,7 @@ TEMPMAIL_CONFIG = {
# 'admin_token': 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...',
# 域名选择(0=第1个域名, 1=第2个, 2=第3个)
'domain_index': 1, # 改成 0, 1, 或 2 来选择不同的域名后缀
'domain_index': 2, # 改成 0, 1, 或 2 来选择不同的域名后缀
}
# SDK 路径

View File

@@ -1,27 +0,0 @@
FROM python:3.11-slim
WORKDIR /app
# 安装系统依赖
RUN apt-get update && apt-get install -y \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
# 复制依赖文件
COPY requirements.txt requirements_bot.txt ./
# 安装 Python 依赖
RUN pip install --no-cache-dir -r requirements.txt -r requirements_bot.txt
# 复制应用代码
COPY . .
# 设置环境变量
ENV PYTHONUNBUFFERED=1
# 暴露端口 (如果需要健康检查)
EXPOSE 8080
# 启动命令
CMD ["python", "tg_bot.py"]

View File

@@ -1,21 +0,0 @@
version: '3.8'
services:
telegram-bot:
build: .
container_name: openai-registration-bot
restart: unless-stopped
environment:
- TELEGRAM_BOT_TOKEN=${TELEGRAM_BOT_TOKEN}
- ALLOWED_USER_IDS=${ALLOWED_USER_IDS:-}
volumes:
- ./config.py:/app/config.py:ro
- ./logs:/app/logs
command: python tg_bot.py
# 可选: 健康检查
healthcheck:
test: ["CMD", "python", "-c", "import telegram; print('OK')"]
interval: 30s
timeout: 10s
retries: 3

View File

@@ -1,63 +0,0 @@
#!/bin/bash
# Telegram Bot 快速启动脚本
set -e
echo "🚀 Starting OpenAI Registration Telegram Bot..."
echo ""
# 检查环境变量
if [ -z "$TELEGRAM_BOT_TOKEN" ]; then
echo "❌ Error: TELEGRAM_BOT_TOKEN not set!"
echo ""
echo "Please set your Telegram Bot token:"
echo " export TELEGRAM_BOT_TOKEN='your_bot_token_here'"
echo ""
echo "Get token from: https://t.me/BotFather"
exit 1
fi
# 检查 Python
if ! command -v python3 &> /dev/null; then
echo "❌ Error: Python 3 not found!"
exit 1
fi
# 检查依赖
echo "📦 Checking dependencies..."
if ! python3 -c "import telegram" 2>/dev/null; then
echo "⚠️ Installing dependencies..."
pip install -q -r requirements_bot.txt
echo "✅ Dependencies installed"
else
echo "✅ Dependencies OK"
fi
# 检查配置
if ! python3 -c "from config import TEMPMAIL_CONFIG; assert 'your.tempmail.domain' not in TEMPMAIL_CONFIG.get('api_base_url', '')" 2>/dev/null; then
echo "❌ Error: config.py not configured!"
echo ""
echo "Please configure config.py with your tempmail settings"
exit 1
fi
echo "✅ Configuration OK"
echo ""
# 显示权限信息
if [ -n "$ALLOWED_USER_IDS" ]; then
echo "🔐 Access Control: Enabled"
echo " Allowed Users: $ALLOWED_USER_IDS"
else
echo "⚠️ Access Control: Disabled (Public Bot)"
echo " Set ALLOWED_USER_IDS to restrict access"
fi
echo ""
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo "✅ Bot starting..."
echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"
echo ""
# 启动 bot
python3 tg_bot.py

View File

@@ -1,16 +0,0 @@
[Unit]
Description=OpenAI Registration Telegram Bot
After=network.target
[Service]
Type=simple
User=your_username
WorkingDirectory=/path/to/autoreg
Environment="TELEGRAM_BOT_TOKEN=your_bot_token_here"
Environment="ALLOWED_USER_IDS=123456789"
ExecStart=/usr/bin/python3 /path/to/autoreg/tg_bot.py
Restart=on-failure
RestartSec=10s
[Install]
WantedBy=multi-user.target

16
flow/__init__.py Normal file
View File

@@ -0,0 +1,16 @@
"""
Flow 包 - 完整注册流程集合
包含以下流程模块:
- flow_complete: 完整注册流程(注册 + 账单 + 支付 + Notion
- flow_email_token: 邮箱和 Token 获取流程
- flow_stripe_payment: Stripe 支付流程
- flow_autodone_notion: 自动完成并保存到 Notion
"""
from .flow_complete import CompleteRegistrationFlow, create_flow
__all__ = [
'CompleteRegistrationFlow',
'create_flow',
]

View File

@@ -0,0 +1,301 @@
#!/usr/bin/env python3
"""完整流程测试:注册 → 获取Access Token → 生成账单链接 → 支付 → 保存到 Notion"""
import os
import sys
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from modules.billing import EUBillingGenerator
from modules.iban_generator import GermanIbanGenerator
from modules.notion_client import NotionClient
from config import TEMPMAIL_CONFIG, DEBUG
def complete_registration_flow():
"""
完整流程:
1. 注册账号(自动生成临时邮箱)
2. 获取 Access Token
3. 生成 EU 账单链接
4. 执行 Stripe 支付(使用 register.py 的 add_payment_method
5. 保存信息到 Notion
"""
print("=" * 80)
print("🚀 完整注册流程:注册 → 支付 → 保存到 Notion")
print("=" * 80)
print()
# ==================== 步骤 0: 检查环境变量 ====================
print("[Step 0] 检查环境变量...")
notion_token = os.environ.get('NOTION_TOKEN')
database_id = os.environ.get('DATA_SOURCE_ID')
if not notion_token:
print("❌ 缺少 NOTION_TOKEN 环境变量")
print(" 请在 .env 文件中设置或运行: export NOTION_TOKEN='your-token'")
return False
if not database_id:
print("❌ 缺少 DATA_SOURCE_ID 环境变量")
print(" 请在 .env 文件中设置或运行: export DATA_SOURCE_ID='your-database-id'")
return False
print(f"✅ NOTION_TOKEN: {notion_token[:20]}...")
print(f"✅ DATA_SOURCE_ID: {database_id}")
print()
# ==================== 步骤 1: 初始化客户端 ====================
print("[Step 1] 初始化客户端...")
# 1.1 初始化临时邮箱客户端
try:
tempmail_client = TempMailClient(
api_base_url=TEMPMAIL_CONFIG['api_base_url'],
username=TEMPMAIL_CONFIG.get('username'),
password=TEMPMAIL_CONFIG.get('password'),
admin_token=TEMPMAIL_CONFIG.get('admin_token')
)
print(f"✅ 临时邮箱客户端初始化成功")
except Exception as e:
print(f"❌ 临时邮箱客户端初始化失败: {e}")
return False
# 1.2 初始化注册器
try:
registrar = OpenAIRegistrar(tempmail_client=tempmail_client)
print(f"✅ 注册器初始化成功")
except Exception as e:
print(f"❌ 注册器初始化失败: {e}")
return False
# 1.3 初始化 Notion 客户端
try:
notion_client = NotionClient(
token=notion_token,
database_id=database_id
)
print(f"✅ Notion 客户端初始化成功")
except Exception as e:
print(f"❌ Notion 客户端初始化失败: {e}")
return False
print()
# ==================== 步骤 2: 注册账号 ====================
print("[Step 2] 开始注册 OpenAI 账号...")
password = "AutoReg2025!@#" # 可自定义密码
try:
reg_result = registrar.register_with_auto_email(password=password)
except Exception as e:
print(f"❌ 注册过程出现异常: {e}")
import traceback
traceback.print_exc()
return False
if not reg_result.get('success'):
print(f"❌ 注册失败: {reg_result.get('error')}")
return False
email = reg_result.get('email')
print(f"✅ 注册成功!")
print(f" Email: {email}")
print(f" Password: {password}")
print(f" Verified: {reg_result.get('verified', False)}")
print()
# ==================== 步骤 3: 获取 Access Token ====================
print("[Step 3] 获取 Access Token...")
try:
access_token = registrar._step5_get_access_token()
print(f"✅ Access Token 获取成功")
print(f" Token: {access_token[:50]}...")
except Exception as e:
print(f"❌ 获取 Access Token 失败: {e}")
import traceback
traceback.print_exc()
return False
print()
# ==================== 步骤 4: 生成 EU 账单链接 ====================
print("[Step 4] 生成 EU 账单链接...")
try:
billing_generator = EUBillingGenerator()
billing_result = billing_generator.generate_checkout_url(access_token)
if not billing_result.success:
print(f"❌ 生成账单链接失败: {billing_result.error}")
return False
checkout_url = billing_result.checkout_url
print(f"✅ 账单链接生成成功")
print(f" URL: {checkout_url[:80]}...")
except Exception as e:
print(f"❌ 生成账单链接时出现异常: {e}")
import traceback
traceback.print_exc()
return False
print()
# ==================== 步骤 5: 执行 Stripe 支付 ====================
print("[Step 5] 执行 Stripe 支付...")
# 5.1 生成德国 IBAN
try:
iban_generator = GermanIbanGenerator()
iban = iban_generator.generate(1)[0]
print(f"✅ 已生成德国 IBAN: {iban}")
except Exception as e:
print(f"❌ 生成 IBAN 失败: {e}")
return False
# 5.2 准备支付信息
payment_info = {
"iban": iban,
"name": "Test User",
"email": email,
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
}
print(f" 使用支付信息:")
print(f" IBAN: {iban[:8]}****{iban[-4:]}")
print(f" 姓名: {payment_info['name']}")
print(f" 地址: {payment_info['address_line1']}, {payment_info['city']}")
# 5.3 使用 registrar.add_payment_method 执行支付
payment_success = False
try:
payment_result = registrar.add_payment_method(
checkout_session_url=checkout_url,
iban=payment_info['iban'],
name=payment_info['name'],
email=payment_info['email'],
address_line1=payment_info['address_line1'],
city=payment_info['city'],
postal_code=payment_info['postal_code'],
state=payment_info['state'],
country=payment_info['country']
)
if payment_result.get('success'):
print(f"✅ 支付成功!")
payment_success = True
else:
print(f"❌ 支付失败: {payment_result.get('error')}")
print(f"⚠️ 将保存基本账号信息到 Notion不含账单链接...")
except Exception as e:
print(f"❌ 支付过程出现异常: {e}")
import traceback
traceback.print_exc()
print(f"⚠️ 将保存基本账号信息到 Notion不含账单链接...")
print()
# ==================== 步骤 6: 保存到 Notion ====================
print("[Step 6] 保存账号信息到 Notion...")
try:
notion_result = notion_client.add_account(
email=email,
password=password,
billing_url=checkout_url if payment_success else None,
person=None, # 暂时不设置人员
status="未开始",
done_status="未开始"
)
if notion_result.get('success'):
print(f"✅ 账号信息已保存到 Notion!")
print(f" Page ID: {notion_result.get('data', {}).get('id', 'N/A')}")
print()
print("=" * 80)
print("🎉 完整流程执行成功!")
print("=" * 80)
print()
print("📋 账号摘要:")
print(f" 邮箱: {email}")
print(f" 密码: {password}")
print(f" 支付状态: {'✅ 已完成' if payment_success else '❌ 失败'}")
print(f" Notion 状态: ✅ 已保存")
if payment_success:
print(f" 账单链接: {checkout_url[:60]}...")
print("=" * 80)
return True
else:
print(f"❌ 保存到 Notion 失败: {notion_result.get('error')}")
print(f"⚠️ 账号已注册但未保存到 Notion")
print()
print("📋 账号信息(请手动保存):")
print(f" 邮箱: {email}")
print(f" 密码: {password}")
if payment_success:
print(f" 账单链接: {checkout_url}")
else:
print(f" 账单链接: N/A支付失败")
return False
except Exception as e:
print(f"❌ 保存到 Notion 时出现异常: {e}")
import traceback
traceback.print_exc()
print()
print("📋 账号信息(请手动保存):")
print(f" 邮箱: {email}")
print(f" 密码: {password}")
if payment_success:
print(f" 账单链接: {checkout_url}")
else:
print(f" 账单链接: N/A支付失败")
return False
def main():
"""主函数"""
print()
print("=" * 80)
print("OpenAI 账号完整注册流程测试")
print("=" * 80)
print()
print("此脚本将执行以下操作:")
print(" 1. 自动生成临时邮箱并注册 OpenAI 账号")
print(" 2. 获取账号的 Access Token")
print(" 3. 生成 EU 团队计划账单链接")
print(" 4. 使用德国 IBAN 完成 Stripe 支付")
print(" 5. 将账号信息保存到 Notion 数据库")
print()
print("⚠️ 请确保:")
print(" - 已在 .env 文件中配置 NOTION_TOKEN 和 DATA_SOURCE_ID")
print(" - 已在 config.py 中配置临时邮箱信息")
print(" - 网络连接正常")
print()
confirm = input("确认开始测试? (y/N): ").strip().lower()
if confirm != 'y':
print("❌ 测试取消")
sys.exit(0)
print()
# 执行完整流程
success = complete_registration_flow()
print()
if success:
print("✅ 测试完成 - 所有步骤成功")
sys.exit(0)
else:
print("❌ 测试完成 - 部分步骤失败(请查看上方日志)")
sys.exit(1)
if __name__ == "__main__":
main()

285
flow/flow_complete.py Normal file
View File

@@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
完整注册流程封装
包含:注册 → Access Token → 账单生成 → 支付 → Notion 保存
"""
import os
from typing import Dict, Optional
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from modules.billing import EUBillingGenerator
from modules.iban_generator import GermanIbanGenerator
from modules.notion_client import NotionClient
from config import TEMPMAIL_CONFIG, DEBUG
class CompleteRegistrationFlow:
"""完整注册流程管理器"""
def __init__(self, tempmail_client: TempMailClient):
"""
初始化流程管理器
Args:
tempmail_client: 临时邮箱客户端实例
"""
self.tempmail_client = tempmail_client
self.registrar = None
self.notion_client = None
# 检查并初始化 Notion 客户端
notion_token = os.environ.get('NOTION_TOKEN')
database_id = os.environ.get('DATA_SOURCE_ID')
if notion_token and database_id:
try:
self.notion_client = NotionClient(
token=notion_token,
database_id=database_id
)
if DEBUG:
print(f"✅ Notion 客户端初始化成功")
except Exception as e:
if DEBUG:
print(f"⚠️ Notion 客户端初始化失败: {e}")
def register_basic(self, password: str) -> Dict:
"""
基础注册(仅注册+验证邮箱)
Args:
password: 账号密码
Returns:
注册结果字典
"""
try:
# 初始化注册器
self.registrar = OpenAIRegistrar(tempmail_client=self.tempmail_client)
# 执行注册
result = self.registrar.register_with_auto_email(password=password)
if result.get('success'):
return {
'success': True,
'email': result.get('email'),
'password': password,
'verified': result.get('verified', False)
}
else:
return {
'success': False,
'error': result.get('error', 'Unknown error')
}
except Exception as e:
return {
'success': False,
'error': str(e)
}
def register_with_billing(self, password: str) -> Dict:
"""
注册 + 生成账单链接
Args:
password: 账号密码
Returns:
包含账单链接的注册结果
"""
try:
# 先执行基础注册
result = self.register_basic(password)
if not result.get('success'):
return result
# 获取 Access Token
try:
access_token = self.registrar._step5_get_access_token()
result['access_token'] = access_token
if DEBUG:
print(f"✅ Access Token 获取成功")
except Exception as e:
result['billing_error'] = f"获取 Access Token 失败: {str(e)}"
return result
# 生成账单链接
try:
billing_generator = EUBillingGenerator()
billing_result = billing_generator.generate_checkout_url(access_token)
if billing_result.success:
result['checkout_url'] = billing_result.checkout_url
if DEBUG:
print(f"✅ 账单链接生成成功")
else:
result['billing_error'] = billing_result.error
except Exception as e:
result['billing_error'] = str(e)
return result
except Exception as e:
return {
'success': False,
'error': str(e)
}
def register_with_payment(
self,
password: str,
payment_info: Optional[Dict] = None,
save_to_notion: bool = True
) -> Dict:
"""
完整注册流程(注册 + 账单 + 支付 + Notion
Args:
password: 账号密码
payment_info: 支付信息字典,包含 name, address_line1, city, postal_code, state, country
save_to_notion: 是否保存到 Notion
Returns:
完整的注册结果
"""
try:
# 先执行账单注册
result = self.register_with_billing(password)
if not result.get('success'):
return result
# 检查是否有账单链接
checkout_url = result.get('checkout_url')
if not checkout_url:
result['payment_error'] = result.get('billing_error', '未生成账单链接')
if save_to_notion and self.notion_client:
self._save_to_notion(result, password)
return result
# 生成 IBAN
try:
iban_generator = GermanIbanGenerator()
iban = iban_generator.generate(1)[0]
result['iban'] = iban
if DEBUG:
print(f"✅ 已生成 IBAN: {iban}")
except Exception as e:
result['payment_error'] = f"生成 IBAN 失败: {str(e)}"
if save_to_notion and self.notion_client:
self._save_to_notion(result, password)
return result
# 准备支付信息(使用默认值或用户提供的值)
default_payment_info = {
"name": "John Doe",
"address_line1": "123 Main Street",
"city": "New York",
"postal_code": "10001",
"state": "NY",
"country": "US"
}
if payment_info:
default_payment_info.update(payment_info)
# 执行支付
try:
payment_result = self.registrar.add_payment_method(
checkout_session_url=checkout_url,
iban=iban,
name=default_payment_info['name'],
email=result['email'],
address_line1=default_payment_info['address_line1'],
city=default_payment_info['city'],
postal_code=default_payment_info['postal_code'],
state=default_payment_info['state'],
country=default_payment_info['country']
)
if payment_result.get('success'):
result['payment_added'] = True
if DEBUG:
print(f"✅ 支付方式添加成功")
else:
result['payment_error'] = payment_result.get('error', 'Payment failed')
except Exception as e:
result['payment_error'] = str(e)
# 保存到 Notion
if save_to_notion and self.notion_client:
self._save_to_notion(result, password)
return result
except Exception as e:
return {
'success': False,
'error': str(e)
}
def _save_to_notion(self, result: Dict, password: str) -> None:
"""
保存账号信息到 Notion
Args:
result: 注册结果字典
password: 账号密码
"""
if not self.notion_client:
if DEBUG:
print(f"⚠️ Notion 客户端未初始化,跳过保存")
return
try:
email = result.get('email')
checkout_url = result.get('checkout_url') if result.get('payment_added') else None
# 根据支付结果设置 autoStatus
# payment_added 为 True 表示支付成功
auto_status = "success" if result.get('payment_added') else "fail"
notion_result = self.notion_client.add_account(
email=email,
password=password,
billing_url=checkout_url,
person=None,
status="未开始", # 车状态
done_status="done", # done 状态
auto_status=auto_status # 自动检测支付状态
)
if notion_result.get('success'):
result['notion_saved'] = True
if DEBUG:
print(f"✅ 已保存到 Notion (autoStatus: {auto_status})")
else:
result['notion_error'] = notion_result.get('error', 'Failed to save')
except Exception as e:
result['notion_error'] = str(e)
if DEBUG:
print(f"⚠️ 保存到 Notion 失败: {e}")
def create_flow(tempmail_client: TempMailClient) -> CompleteRegistrationFlow:
"""
工厂函数:创建完整流程实例
Args:
tempmail_client: 临时邮箱客户端
Returns:
CompleteRegistrationFlow 实例
"""
return CompleteRegistrationFlow(tempmail_client)

545
flow/flow_email_token.py Normal file
View File

@@ -0,0 +1,545 @@
#!/usr/bin/env python3
"""
OpenAI 登录测试脚本
根据抓包数据实现完整登录流程
"""
import json
import uuid
import secrets
import sys
import os
from urllib.parse import urlparse, parse_qs
# 添加项目根目录到 Python 路径
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
try:
from curl_cffi.requests import Session
USE_CURL = True
print("✅ Using curl_cffi")
except ImportError:
import requests
Session = requests.Session
USE_CURL = False
print("⚠️ Using requests (may fail)")
# 导入项目模块
try:
from modules.fingerprint import BrowserFingerprint
from modules.http_client import HTTPClient
from modules.sentinel_solver import SentinelSolver
from modules.pow_solver import ProofOfWorkSolver
MODULES_AVAILABLE = True
print("✅ Project modules loaded")
except ImportError as e:
MODULES_AVAILABLE = False
print(f"⚠️ Project modules not available: {e}")
class OpenAILogin:
"""OpenAI 登录客户端"""
def __init__(self):
self.device_id = str(uuid.uuid4())
self.auth_session_logging_id = str(uuid.uuid4())
# 使用项目模块(如果可用)
if MODULES_AVAILABLE:
self.fingerprint = BrowserFingerprint(session_id=self.device_id)
self.http_client = HTTPClient(self.fingerprint)
self.session = self.http_client.session # 保持兼容性
self.sentinel_solver = SentinelSolver(self.fingerprint)
self.pow_solver = ProofOfWorkSolver()
print("✅ Using HTTPClient with project modules")
else:
# 降级使用原始 session
self.session = Session(impersonate='chrome110') if USE_CURL else Session()
self.fingerprint = None
self.http_client = None
self.sentinel_solver = None
self.pow_solver = None
print("⚠️ Using fallback session")
def get_headers(self, host='chatgpt.com', **extras):
"""生成请求头"""
# 如果有项目的 fingerprint使用它
if self.fingerprint:
headers = self.fingerprint.get_headers(host=host)
headers.update(extras)
return headers
# 否则使用备用 headers
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36',
'Accept-Language': 'en-US,en;q=0.9',
'Sec-Ch-Ua': '"Chromium";v="131", "Not_A Brand";v="24"',
'Sec-Ch-Ua-Mobile': '?0',
'Sec-Ch-Ua-Platform': '"Windows"',
}
if host:
headers['Host'] = host
headers.update(extras)
return headers
def step1_get_csrf(self):
"""Step 1: 获取 CSRF token"""
print("\n[1] Getting CSRF token...")
import time
# 1.1 访问首页(获取初始 cookies
url = "https://chatgpt.com/"
headers = self.get_headers(host='chatgpt.com')
headers.update({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'none',
'Sec-Fetch-User': '?1',
'Upgrade-Insecure-Requests': '1',
})
resp = self.session.get(url, headers=headers, timeout=30)
print(f" [1.1] Visited chatgpt.com: {resp.status_code}")
# 如果遇到 Cloudflare等待后重试
if resp.status_code == 403:
print(f" ⚠️ Cloudflare challenge detected, waiting 5s...")
time.sleep(5)
resp = self.session.get(url, headers=headers, timeout=30)
print(f" [1.1 Retry] Status: {resp.status_code}")
# 1.2 获取 CSRF
csrf_url = "https://chatgpt.com/api/auth/csrf"
headers = self.get_headers(host='chatgpt.com')
headers.update({
'Accept': '*/*',
'Referer': 'https://chatgpt.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
})
resp = self.session.get(csrf_url, headers=headers, timeout=30)
print(f" [1.2] Got CSRF response: {resp.status_code}")
# 提取 CSRF token
csrf_token = None
# 方法1从 cookie 提取
csrf_cookie = self.session.cookies.get('__Host-next-auth.csrf-token', '')
if csrf_cookie:
from urllib.parse import unquote
csrf_cookie = unquote(csrf_cookie)
if '|' in csrf_cookie:
csrf_token = csrf_cookie.split('|')[0]
# 方法2从响应 JSON 提取
if not csrf_token:
try:
data = resp.json()
csrf_token = data.get('csrfToken', '')
except:
pass
if csrf_token:
print(f" ✅ CSRF token: {csrf_token[:30]}...")
return csrf_token
else:
raise Exception("Failed to get CSRF token")
def step2_signin_request(self, email: str, csrf_token: str):
"""Step 2: 发起登录请求(获取 OAuth URL"""
print("\n[2] Initiating signin...")
url = "https://chatgpt.com/api/auth/signin/openai"
# Query 参数
params = {
'prompt': 'login',
'ext-oai-did': self.device_id,
'auth_session_logging_id': self.auth_session_logging_id,
'screen_hint': 'login_or_signup', # 或 'login'
'login_hint': email,
}
# POST body
data = {
'callbackUrl': 'https://chatgpt.com/',
'csrfToken': csrf_token,
'json': 'true',
}
# Headers
headers = self.get_headers(host='chatgpt.com')
headers.update({
'Content-Type': 'application/x-www-form-urlencoded',
'Accept': '*/*',
'Origin': 'https://chatgpt.com',
'Referer': 'https://chatgpt.com/',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Site': 'same-origin',
})
resp = self.session.post(
url,
params=params,
data=data,
headers=headers,
allow_redirects=False,
timeout=30
)
print(f" Status: {resp.status_code}")
# 提取 OAuth URL
oauth_url = None
if resp.status_code == 200:
try:
result = resp.json()
oauth_url = result.get('url')
except:
pass
elif resp.status_code in [301, 302, 303, 307, 308]:
oauth_url = resp.headers.get('Location')
if oauth_url:
print(f" ✅ Got OAuth URL")
return oauth_url
else:
raise Exception(f"Failed to get OAuth URL: {resp.status_code} {resp.text}")
def step3_visit_oauth(self, oauth_url: str):
"""Step 3: 访问 OAuth authorize重定向到密码页面"""
print("\n[3] Following OAuth flow...")
headers = self.get_headers(host='auth.openai.com')
headers.update({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Referer': 'https://chatgpt.com/',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'cross-site',
'Upgrade-Insecure-Requests': '1',
})
resp = self.session.get(
oauth_url,
headers=headers,
allow_redirects=True, # 自动跟随重定向到 /log-in/password
timeout=30
)
print(f" Final URL: {resp.url}")
print(f" Status: {resp.status_code}")
# 检查是否获取到必需的 cookies
required_cookies = ['login_session', 'oai-did', 'oai-login-csrf_dev_3772291445']
for cookie_name in required_cookies:
cookie_value = self.session.cookies.get(cookie_name)
if cookie_value:
print(f" ✅ Cookie: {cookie_name}")
else:
print(f" ⚠️ Missing: {cookie_name}")
return resp
def step3_5_generate_sentinel(self):
"""Step 3.5: 生成 Sentinel token如果需要"""
if not MODULES_AVAILABLE or not self.sentinel_solver:
print("\n[3.5] ⚠️ Skipping Sentinel (not available)")
return None
print("\n[3.5] Generating Sentinel token...")
try:
# 生成 requirements token
token_data = self.sentinel_solver.generate_requirements_token()
sentinel_token = json.dumps(token_data)
print(f" ✅ Sentinel token generated")
# 提交到 Sentinel 服务器
url = "https://sentinel.openai.com/backend-api/sentinel/req"
payload = {
"p": token_data['p'],
"id": self.device_id,
"flow": "username_password_login" # 注意:登录用 login注册用 create
}
headers = self.get_headers(host='sentinel.openai.com')
headers.update({
'Content-Type': 'text/plain;charset=UTF-8',
'Origin': 'https://sentinel.openai.com',
'Referer': 'https://sentinel.openai.com/backend-api/sentinel/frame.html',
'Accept': '*/*',
'Sec-Fetch-Site': 'same-origin',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Dest': 'empty',
})
resp = self.session.post(url, json=payload, headers=headers, timeout=30)
if resp.status_code == 200:
data = resp.json()
print(f" ✅ Sentinel accepted (persona: {data.get('persona')})")
# 检查是否需要 PoW
if data.get('proofofwork', {}).get('required'):
print(f" ⚠️ PoW required, solving...")
pow_data = data['proofofwork']
pow_answer = self.pow_solver.solve(pow_data['seed'], pow_data['difficulty'])
# 提交 PoW
pow_payload = {
"p": token_data['p'],
"id": self.device_id,
"answer": pow_answer
}
pow_resp = self.session.post(url, json=pow_payload, headers=headers, timeout=30)
if pow_resp.status_code == 200:
print(f" ✅ PoW accepted")
else:
print(f" ⚠️ PoW failed: {pow_resp.status_code}")
return sentinel_token
else:
print(f" ⚠️ Sentinel failed: {resp.status_code}")
return None
except Exception as e:
print(f" ⚠️ Sentinel error: {e}")
import traceback
traceback.print_exc()
return None
def step4_submit_password(self, email: str, password: str, sentinel_token: str = None):
"""Step 4: 提交密码(验证)"""
print("\n[4] Submitting password...")
# 正确的接口:/api/accounts/password/verify
url = "https://auth.openai.com/api/accounts/password/verify"
# 需要同时传 username 和 password
payload = {
"username": email,
"password": password,
}
# Headers
headers = self.get_headers(host='auth.openai.com')
headers.update({
'Content-Type': 'application/json',
'Accept': 'application/json',
'Origin': 'https://auth.openai.com',
'Referer': 'https://auth.openai.com/log-in/password',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
})
# 添加 Sentinel token如果有
if sentinel_token:
headers['Openai-Sentinel-Token'] = sentinel_token
print(f" ✅ Using Sentinel token")
# 添加 Datadog tracing headers模拟真实请求
headers.update({
'X-Datadog-Trace-Id': str(secrets.randbits(63)),
'X-Datadog-Parent-Id': str(secrets.randbits(63)),
'X-Datadog-Sampling-Priority': '1',
'X-Datadog-Origin': 'rum',
'Traceparent': f'00-0000000000000000{secrets.token_hex(8)}-{secrets.token_hex(8)}-01',
'Tracestate': 'dd=s:1;o:rum',
})
resp = self.session.post(
url,
json=payload,
headers=headers,
allow_redirects=False,
timeout=30
)
print(f" Status: {resp.status_code}")
try:
data = resp.json()
print(f" Response: {json.dumps(data, indent=2)}")
if resp.status_code == 200:
# 检查是否需要跟随 OAuth 回调
continue_url = data.get('continue_url')
if continue_url:
print(f" ✅ Login successful, need to complete OAuth")
return {'success': True, 'continue_url': continue_url}
else:
return {'success': True}
else:
return {'success': False, 'error': data}
except Exception as e:
print(f" ❌ Exception: {e}")
return {'success': False, 'error': str(e)}
def step5_complete_oauth(self, continue_url: str):
"""Step 5: 完成 OAuth 回调(获取 session-token"""
print("\n[5] Completing OAuth callback...")
# 确保是完整 URL
if not continue_url.startswith('http'):
continue_url = f"https://auth.openai.com{continue_url}"
headers = self.get_headers(host='auth.openai.com')
headers.update({
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Referer': 'https://auth.openai.com/log-in/password',
'Sec-Fetch-Dest': 'document',
'Sec-Fetch-Mode': 'navigate',
'Sec-Fetch-Site': 'same-origin',
'Upgrade-Insecure-Requests': '1',
})
resp = self.session.get(
continue_url,
headers=headers,
allow_redirects=True, # 自动跟随重定向到 chatgpt.com
timeout=30
)
print(f" Final URL: {resp.url}")
print(f" Status: {resp.status_code}")
# 检查是否获取到 session-token
session_token = self.session.cookies.get('__Secure-next-auth.session-token')
if session_token:
print(f" ✅ Got session-token: {session_token[:50]}...")
return True
else:
print(f" ❌ No session-token found")
return False
def step6_get_access_token(self):
"""Step 6: 获取 accessToken"""
print("\n[6] Getting access token...")
url = "https://chatgpt.com/api/auth/session"
headers = self.get_headers(host='chatgpt.com')
headers.update({
'Accept': 'application/json',
'Referer': 'https://chatgpt.com/',
'Sec-Fetch-Dest': 'empty',
'Sec-Fetch-Mode': 'cors',
'Sec-Fetch-Site': 'same-origin',
})
resp = self.session.get(url, headers=headers, timeout=30)
print(f" Status: {resp.status_code}")
if resp.status_code == 200:
try:
data = resp.json()
access_token = data.get('accessToken')
if access_token:
print(f" ✅ Access token: {access_token[:50]}...")
return access_token
else:
print(f" ❌ No accessToken in response")
print(f" Response: {json.dumps(data, indent=2)}")
return None
except Exception as e:
print(f" ❌ Failed to parse response: {e}")
return None
else:
print(f" ❌ Failed: {resp.status_code}")
return None
def login(self, email: str, password: str):
"""完整登录流程"""
print(f"\n{'='*60}")
print(f"Starting login for: {email}")
print(f"{'='*60}")
try:
# Step 1: 获取 CSRF token
csrf_token = self.step1_get_csrf()
# Step 2: 发起登录请求
oauth_url = self.step2_signin_request(email, csrf_token)
# Step 3: 访问 OAuth重定向到密码页面
self.step3_visit_oauth(oauth_url)
# Step 3.5: 生成 Sentinel token
sentinel_token = self.step3_5_generate_sentinel()
# Step 4: 提交密码
login_result = self.step4_submit_password(email, password, sentinel_token)
if not login_result.get('success'):
print(f"\n❌ Login failed: {login_result.get('error')}")
return None
# Step 5: 完成 OAuth 回调
continue_url = login_result.get('continue_url')
if continue_url:
oauth_success = self.step5_complete_oauth(continue_url)
if not oauth_success:
print(f"\n❌ OAuth callback failed")
return None
# Step 6: 获取 access token
access_token = self.step6_get_access_token()
if access_token:
print(f"\n{'='*60}")
print(f"✅ LOGIN SUCCESS!")
print(f"{'='*60}")
return access_token
else:
print(f"\n❌ Failed to get access token")
return None
except Exception as e:
print(f"\n❌ Exception during login: {e}")
import traceback
traceback.print_exc()
return None
def main():
"""测试入口"""
# 从用户输入获取账号密码
email = input("Email: ").strip()
password = input("Password: ").strip()
if not email or not password:
print("❌ Email and password are required!")
return
# 创建登录客户端
client = OpenAILogin()
# 执行登录
access_token = client.login(email, password)
if access_token:
print(f"\n📋 Access Token:")
print(access_token)
print(f"\n💾 You can use this token to make API calls")
else:
print(f"\n❌ Login failed")
if __name__ == "__main__":
main()

View File

@@ -6,6 +6,7 @@ Stripe Payment Module Test Script
import sys
from modules.stripe_payment import StripePaymentHandler
from modules.iban_generator import GermanIbanGenerator
def test_basic_flow():
"""测试基本支付流程"""
@@ -35,10 +36,19 @@ def test_basic_flow():
print()
# IBAN德国银行账号
iban = input(" IBAN (德国银行账号,如 DE89370400440532013000): ").strip()
if not iban:
iban = "DE89370400440532013000" # 默认测试IBAN
print(f" 使用默认: {iban}")
print(" IBAN 选项:")
print(" - 直接回车: 自动生成德国 IBAN")
print(" - 输入具体值: 使用指定的 IBAN")
iban_input = input(" IBAN: ").strip()
if not iban_input:
# 自动生成 IBAN
generator = GermanIbanGenerator()
iban = generator.generate(1)[0]
print(f" ✨ 已生成: {iban}")
else:
iban = iban_input
print(f" 使用输入: {iban}")
# 姓名
name = input(" 持卡人姓名 (如 John Doe): ").strip()
@@ -162,13 +172,19 @@ def test_step_by_step():
print(f"✅ Initialized with session: {handler.session_id}")
print()
# 生成 IBAN
generator = GermanIbanGenerator()
iban = generator.generate(1)[0]
print(f"✨ 已自动生成 IBAN: {iban}")
print()
# Step 1: 创建支付方式
print("=" * 70)
print("Step 1: Creating Payment Method...")
print("=" * 70)
payment_method_id = handler.create_payment_method(
iban="DE89370400440532013000",
iban=iban,
name="Test User",
email="test@example.com",
address_line1="123 Test St",
@@ -235,11 +251,17 @@ def quick_test_with_defaults():
print("示例: https://pay.openai.com/c/pay/cs_live_xxx...")
sys.exit(1)
# 生成 IBAN
generator = GermanIbanGenerator()
iban = generator.generate(1)[0]
print(f"✨ 已自动生成 IBAN: {iban}")
print()
try:
handler = StripePaymentHandler(checkout_url)
success = handler.complete_payment(
iban="DE89370400440532013000",
iban=iban,
name="Test User",
email="test@example.com",
address_line1="123 Main Street",

View File

@@ -1,270 +0,0 @@
#!/usr/bin/env python3
"""Batch EU Billing Generator for Registered Accounts
Reads accounts from registered_accounts.json and generates EU billing checkout URLs.
Usage:
python generate_billing.py # Process registered_accounts.json
python generate_billing.py --input custom.json # Process custom file
python generate_billing.py --email user@example.com --password pass # Single account
"""
import argparse
import json
import sys
from typing import List, Dict, Optional
from datetime import datetime
from modules.billing import EUBillingGenerator
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from config import TEMPMAIL_CONFIG, DEBUG
def login_and_get_token(email: str, password: str) -> Optional[str]:
"""Login with email/password and retrieve access token
This function creates a new OpenAIRegistrar instance and performs
the full login flow to obtain an authenticated session, then retrieves
the access token.
Args:
email: Account email
password: Account password
Returns:
Access token or None if login fails
Raises:
Exception: If login or token retrieval fails
"""
if DEBUG:
print(f"\n[Login] Authenticating {email}...")
# Create registrar with no tempmail client (we already have credentials)
registrar = OpenAIRegistrar(tempmail_client=None)
try:
# Execute login flow (Steps 1-4)
# Note: This reuses the registration flow logic, but since the account
# already exists, we just need to authenticate to get the session
registrar._step1_init_through_chatgpt(email)
# Initialize Sentinel (needed for authentication)
registrar._step2_init_sentinel()
registrar._step2_5_submit_sentinel()
# Check if PoW is required
registrar._step2_6_solve_pow()
if hasattr(registrar, 'pow_answer') and registrar.pow_answer:
registrar._step2_7_submit_pow()
# Now retrieve access token from authenticated session
access_token = registrar._step5_get_access_token()
if DEBUG:
print(f"✅ [Login] Authentication successful")
return access_token
except Exception as e:
if DEBUG:
print(f"❌ [Login] Authentication failed: {e}")
raise
def generate_billing_for_account(email: str, password: str) -> Dict:
"""Generate billing URL for a single account
Args:
email: Account email
password: Account password
Returns:
Result dict with checkout_url or error
"""
try:
# Login and get access token
access_token = login_and_get_token(email, password)
# Generate billing URL
billing_gen = EUBillingGenerator()
billing_result = billing_gen.generate_checkout_url(access_token)
if billing_result.success:
return {
'success': True,
'email': email,
'checkout_url': billing_result.checkout_url,
'generated_at': datetime.utcnow().isoformat() + 'Z'
}
else:
return {
'success': False,
'email': email,
'error': billing_result.error
}
except Exception as e:
return {
'success': False,
'email': email,
'error': str(e)
}
def process_batch_accounts(input_file: str, output_file: str):
"""Process accounts from JSON file and generate billing URLs
Args:
input_file: Input JSON file with accounts
output_file: Output JSON file with billing URLs
"""
print(f"\n{'='*60}")
print(f"Batch EU Billing Generator")
print(f"{'='*60}\n")
# Read input file
try:
with open(input_file, 'r', encoding='utf-8') as f:
accounts = json.load(f)
except FileNotFoundError:
print(f"❌ Error: Input file not found: {input_file}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"❌ Error: Invalid JSON in input file: {e}")
sys.exit(1)
if not isinstance(accounts, list):
print(f"❌ Error: Input file must contain a JSON array of accounts")
sys.exit(1)
print(f"📋 Loaded {len(accounts)} accounts from {input_file}\n")
# Process each account
results = []
success_count = 0
failed_count = 0
for i, account in enumerate(accounts, 1):
email = account.get('email')
password = account.get('password')
if not email or not password:
print(f"[{i}/{len(accounts)}] ⚠️ Skipping account (missing email or password)")
failed_count += 1
results.append({
'success': False,
'email': email or 'N/A',
'error': 'Missing email or password in input'
})
continue
print(f"\n{''*60}")
print(f"[{i}/{len(accounts)}] Processing: {email}")
print(f"{''*60}")
result = generate_billing_for_account(email, password)
results.append(result)
if result.get('success'):
success_count += 1
print(f"✅ Billing URL generated")
print(f" URL: {result['checkout_url']}")
else:
failed_count += 1
print(f"❌ Failed: {result.get('error')}")
# Save results to output file
try:
with open(output_file, 'w', encoding='utf-8') as f:
json.dump(results, f, indent=2, ensure_ascii=False)
print(f"\n{'='*60}")
print(f"Results saved to: {output_file}")
except Exception as e:
print(f"\n❌ Error saving results: {e}")
sys.exit(1)
# Print summary
print(f"{'='*60}")
print(f"Total: {len(accounts)} accounts")
print(f"Success: {success_count}")
print(f"Failed: {failed_count}")
print(f"{'='*60}\n")
def main():
parser = argparse.ArgumentParser(
description='Batch EU Billing Generator',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
python generate_billing.py # Process registered_accounts.json
python generate_billing.py -i accounts.json -o billing.json # Custom input/output
python generate_billing.py --email user@example.com --password pass # Single account
"""
)
parser.add_argument(
'-i', '--input',
type=str,
default='registered_accounts.json',
help='Input JSON file with registered accounts (default: registered_accounts.json)'
)
parser.add_argument(
'-o', '--output',
type=str,
default='billing_urls.json',
help='Output JSON file for billing URLs (default: billing_urls.json)'
)
parser.add_argument(
'--email',
type=str,
help='Single account email (requires --password)'
)
parser.add_argument(
'--password',
type=str,
help='Single account password (requires --email)'
)
args = parser.parse_args()
# Single account mode
if args.email or args.password:
if not (args.email and args.password):
print("❌ Error: --email and --password must be used together")
sys.exit(1)
print(f"\n{'='*60}")
print(f"Single Account Billing Generator")
print(f"{'='*60}\n")
result = generate_billing_for_account(args.email, args.password)
print(f"\n{'='*60}")
if result.get('success'):
print(f"✅ SUCCESS")
print(f"{'='*60}")
print(f"Email: {result['email']}")
print(f"Checkout URL:")
print(f" {result['checkout_url']}")
print(f"{'='*60}\n")
sys.exit(0)
else:
print(f"❌ FAILED")
print(f"{'='*60}")
print(f"Email: {result['email']}")
print(f"Error: {result['error']}")
print(f"{'='*60}\n")
sys.exit(1)
# Batch mode
process_batch_accounts(args.input, args.output)
if __name__ == '__main__':
main()

70
main.py
View File

@@ -1,70 +0,0 @@
#!/usr/bin/env python3
# main.py
"""OpenAI Sentinel Bypass - 主入口"""
import sys
import argparse
from modules import OpenAIRegistrar
from config import DEBUG
def main():
parser = argparse.ArgumentParser(
description='OpenAI Registration Automation (Sentinel Bypass)'
)
parser.add_argument(
'email',
help='Email address to register'
)
parser.add_argument(
'password',
help='Password for the account'
)
parser.add_argument(
'--session-id',
help='Custom session ID (default: auto-generated UUID)',
default=None
)
parser.add_argument(
'--quiet',
action='store_true',
help='Suppress debug output'
)
args = parser.parse_args()
# 设置调试模式
if args.quiet:
import config
config.DEBUG = False
# 创建注册器
registrar = OpenAIRegistrar(session_id=args.session_id)
# 执行注册
result = registrar.register(args.email, args.password)
# 输出结果
print("\n" + "="*60)
if result['success']:
print("✓ REGISTRATION SUCCESSFUL")
print(f"Email: {args.email}")
if 'data' in result:
print(f"Response: {result['data']}")
else:
print("✗ REGISTRATION FAILED")
print(f"Error: {result.get('error', 'Unknown error')}")
if 'status_code' in result:
print(f"Status code: {result['status_code']}")
print("="*60)
# 返回退出码
sys.exit(0 if result['success'] else 1)
if __name__ == '__main__':
main()

180
modules/notion_client.py Normal file
View File

@@ -0,0 +1,180 @@
"""Notion API 客户端 - 用于保存注册账号信息到 Notion 数据库"""
import os
import requests
from typing import Dict, Optional
from config import DEBUG
class NotionClient:
"""Notion 数据库客户端"""
def __init__(self, token: Optional[str] = None, database_id: Optional[str] = None):
"""
初始化 Notion 客户端
Args:
token: Notion Integration Token或从环境变量 NOTION_TOKEN 读取)
database_id: Notion 数据库 ID或从环境变量 DATA_SOURCE_ID 读取)
"""
self.token = token or os.environ.get('NOTION_TOKEN')
self.database_id = database_id or os.environ.get('DATA_SOURCE_ID')
if not self.token:
raise ValueError("NOTION_TOKEN not found in environment or parameters")
if not self.database_id:
raise ValueError("DATA_SOURCE_ID not found in environment or parameters")
self.api_url = "https://api.notion.com/v1/pages"
self.api_version = "2025-09-03"
if DEBUG:
print(f"✅ NotionClient initialized")
print(f" Token: {self.token[:20]}...")
print(f" Database ID: {self.database_id}")
def add_account(
self,
email: str,
password: str,
billing_url: Optional[str] = None,
person: Optional[str] = None,
status: str = "未开始",
done_status: str = "done",
auto_status: str = "fail"
) -> Dict:
"""
添加账号信息到 Notion 数据库
Args:
email: 邮箱地址
password: 密码
billing_url: 账单链接(可选)
person: 人员(可选)
status: 车状态(默认 "未开始"
done_status: Done 状态(默认 "done"
auto_status: 自动检测支付状态("success""fail",默认 "fail"
Returns:
API 响应结果
"""
# 构建 properties
properties = {
"email": {
"title": [{"text": {"content": email}}]
},
"密码": {
"rich_text": [{"text": {"content": password}}]
},
"车状态": {
"status": {"name": status}
},
"done": {
"status": {"name": done_status}
},
"autoStatus": {
"rich_text": [{"text": {"content": auto_status}}]
}
}
# 可选字段:账单链接
if billing_url:
properties["账单链接"] = {"url": billing_url}
# 可选字段:人员
if person:
properties["人员"] = {"select": {"name": person}}
# 构建请求体
payload = {
"parent": {
"type": "data_source_id",
"data_source_id": self.database_id
},
"properties": properties
}
# 准备 headers
headers = {
"Authorization": f"Bearer {self.token}",
"Notion-Version": self.api_version,
"Content-Type": "application/json"
}
# 发送请求
try:
resp = requests.post(
self.api_url,
headers=headers,
json=payload,
timeout=30
)
if resp.status_code == 200:
if DEBUG:
print(f"✅ Account added to Notion: {email}")
return {
'success': True,
'data': resp.json()
}
else:
error_msg = f"Failed to add account: {resp.status_code} {resp.text}"
if DEBUG:
print(f"{error_msg}")
return {
'success': False,
'error': error_msg,
'status_code': resp.status_code,
'response': resp.text
}
except Exception as e:
error_msg = f"Exception during Notion API call: {e}"
if DEBUG:
print(f"{error_msg}")
return {
'success': False,
'error': error_msg
}
def add_registered_account(
self,
registration_result: Dict,
password: str,
billing_url: Optional[str] = None
) -> Dict:
"""
从注册结果中提取信息并添加到 Notion
Args:
registration_result: OpenAIRegistrar.register() 或 register_with_auto_email() 的返回结果
password: 账号密码
billing_url: 账单链接(可选)
Returns:
API 响应结果
"""
if not registration_result.get('success'):
return {
'success': False,
'error': 'Registration failed, cannot add to Notion'
}
email = registration_result.get('email')
if not email:
return {
'success': False,
'error': 'No email found in registration result'
}
return self.add_account(
email=email,
password=password,
billing_url=billing_url,
status="未开始",
done_status="未开始"
)

View File

@@ -820,17 +820,6 @@ class OpenAIRegistrar:
if hasattr(cookies, 'get'):
session_token = cookies.get('__Secure-next-auth.session-token')
# 如果没找到,尝试 CookieJar 格式requests
if not session_token:
try:
for cookie in cookies:
if hasattr(cookie, 'name') and cookie.name == '__Secure-next-auth.session-token':
session_token = cookie.value
break
except TypeError:
# cookies 不可迭代,已经尝试过字典访问
pass
if session_token:
if DEBUG:
print(f"✅ [4.6] OAuth flow completed")
@@ -1093,7 +1082,7 @@ class OpenAIRegistrar:
print(f"\n🔐 [Payment] Starting payment method setup...")
print(f" Session URL: {checkout_session_url[:60]}...")
# 初始化Stripe支付处理器共享HTTPClient
# 初始化Stripe支付处理器
payment_handler = StripePaymentHandler(
checkout_session_url=checkout_session_url,
http_client=self.http_client

235
reference/autoinvite.py Normal file
View File

@@ -0,0 +1,235 @@
from flask import Flask, render_template, request, jsonify # type_ignore
import requests
import os
from dotenv import load_dotenv
import logging
import time
from datetime import datetime, timedelta, timezone
load_dotenv()
app = Flask(__name__)
app.secret_key = os.getenv("SECRET_KEY", "dev_secret_key")
logging.getLogger("werkzeug").setLevel(logging.ERROR)
app.logger.setLevel(logging.INFO)
class No404Filter(logging.Filter):
def filter(self, record):
return not (getattr(record, "status_code", None) == 404)
logging.getLogger("werkzeug").addFilter(No404Filter())
AUTHORIZATION_TOKEN = os.getenv("AUTHORIZATION_TOKEN")
ACCOUNT_ID = os.getenv("ACCOUNT_ID")
CF_TURNSTILE_SECRET_KEY = os.getenv("CF_TURNSTILE_SECRET_KEY")
CF_TURNSTILE_SITE_KEY = os.getenv("CF_TURNSTILE_SITE_KEY")
STATS_CACHE_TTL = 60
stats_cache = {"timestamp": 0, "data": None}
def get_client_ip_address():
if "CF-Connecting-IP" in request.headers:
return request.headers["CF-Connecting-IP"]
if "X-Forwarded-For" in request.headers:
return request.headers["X-Forwarded-For"].split(",")[0].strip()
return request.remote_addr or "unknown"
def build_base_headers():
return {
"accept": "*/*",
"accept-language": "zh-CN,zh;q=0.9",
"authorization": AUTHORIZATION_TOKEN,
"chatgpt-account-id": ACCOUNT_ID,
"user-agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/135.0.0.0 Safari/537.36",
}
def build_invite_headers():
headers = build_base_headers()
headers.update(
{
"content-type": "application/json",
"origin": "https://chatgpt.com",
"referer": "https://chatgpt.com/",
'sec-ch-ua': '"Chromium";v="135", "Not)A;Brand";v="99", "Google Chrome";v="135"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"Windows"',
}
)
return headers
def parse_emails(raw_emails):
if not raw_emails:
return [], []
parts = raw_emails.replace("\n", ",").split(",")
emails = [p.strip() for p in parts if p.strip()]
valid = [e for e in emails if e.count("@") == 1]
return emails, valid
def validate_turnstile(turnstile_response):
if not turnstile_response:
return False
data = {
"secret": CF_TURNSTILE_SECRET_KEY,
"response": turnstile_response,
"remoteip": get_client_ip_address(),
}
try:
response = requests.post(
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
data=data,
timeout=10,
)
result = response.json()
return result.get("success", False)
except Exception:
return False
def stats_expired():
if stats_cache["data"] is None:
return True
return time.time() - stats_cache["timestamp"] >= STATS_CACHE_TTL
def refresh_stats():
base_headers = build_base_headers()
subs_url = f"https://chatgpt.com/backend-api/subscriptions?account_id={ACCOUNT_ID}"
invites_url = f"https://chatgpt.com/backend-api/accounts/{ACCOUNT_ID}/invites?offset=0&limit=1&query="
subs_resp = requests.get(subs_url, headers=base_headers, timeout=10)
subs_resp.raise_for_status()
subs_data = subs_resp.json()
invites_resp = requests.get(invites_url, headers=base_headers, timeout=10)
invites_resp.raise_for_status()
invites_data = invites_resp.json()
stats = {
"seats_in_use": subs_data.get("seats_in_use"),
"seats_entitled": subs_data.get("seats_entitled"),
"pending_invites": invites_data.get("total"),
"plan_type": subs_data.get("plan_type"),
"active_start": subs_data.get("active_start"),
"active_until": subs_data.get("active_until"),
"billing_period": subs_data.get("billing_period"),
"billing_currency": subs_data.get("billing_currency"),
"will_renew": subs_data.get("will_renew"),
"is_delinquent": subs_data.get("is_delinquent"),
}
stats_cache["data"] = stats
stats_cache["timestamp"] = time.time()
return stats
@app.route("/")
def index():
client_ip = get_client_ip_address()
app.logger.info(f"Index page accessed by IP: {client_ip}")
return render_template("index.html", site_key=CF_TURNSTILE_SITE_KEY)
@app.route("/send-invites", methods=["POST"])
def send_invites():
client_ip = get_client_ip_address()
app.logger.info(f"Invitation request received from IP: {client_ip}")
raw_emails = request.form.get("emails", "").strip()
email_list, valid_emails = parse_emails(raw_emails)
cf_turnstile_response = request.form.get("cf-turnstile-response")
turnstile_valid = validate_turnstile(cf_turnstile_response)
if not turnstile_valid:
app.logger.warning(f"CAPTCHA verification failed for IP: {client_ip}")
return jsonify({"success": False, "message": "CAPTCHA verification failed. Please try again."})
if not email_list:
return jsonify({"success": False, "message": "Please enter at least one email address."})
if not valid_emails:
return jsonify({"success": False, "message": "Email addresses are not valid. Please check and try again."})
headers = build_invite_headers()
payload = {"email_addresses": valid_emails, "role": "standard-user", "resend_emails": True}
invite_url = f"https://chatgpt.com/backend-api/accounts/{ACCOUNT_ID}/invites"
try:
resp = requests.post(invite_url, headers=headers, json=payload, timeout=10)
if resp.status_code == 200:
app.logger.info(f"Successfully sent invitations to {len(valid_emails)} emails from IP: {client_ip}")
return jsonify(
{
"success": True,
"message": f"Successfully sent invitations for: {', '.join(valid_emails)}",
}
)
else:
app.logger.error(f"Failed to send invitations from IP: {client_ip}. Status code: {resp.status_code}")
return jsonify(
{
"success": False,
"message": "Failed to send invitations.",
"details": {"status_code": resp.status_code, "body": resp.text},
}
)
except Exception as e:
app.logger.error(f"Error sending invitations from IP: {client_ip}. Error: {str(e)}")
return jsonify({"success": False, "message": f"Error: {str(e)}"})
@app.route("/stats")
def stats():
client_ip = get_client_ip_address()
app.logger.info(f"Stats requested from IP: {client_ip}")
refresh = request.args.get("refresh") == "1"
try:
if refresh:
data = refresh_stats()
expired = False
else:
expired = stats_expired()
if stats_cache["data"] is None:
data = refresh_stats()
expired = False
else:
data = stats_cache["data"]
updated_at = None
if stats_cache["timestamp"]:
ts = stats_cache["timestamp"]
dt_utc = datetime.fromtimestamp(ts, tz=timezone.utc)
cst_tz = timezone(timedelta(hours=8))
dt_cst = dt_utc.astimezone(cst_tz)
updated_at = dt_cst.strftime("%Y-%m-%d %H:%M:%S")
return jsonify(
{
"success": True,
"data": data,
"expired": expired,
"updated_at": updated_at,
}
)
except Exception as e:
app.logger.error(f"Error fetching stats from IP: {client_ip}. Error: {str(e)}")
return jsonify({"success": False, "message": f"Error fetching stats: {str(e)}"}), 500
@app.errorhandler(404)
def not_found(e):
return jsonify({"error": "Not found"}), 404
if __name__ == "__main__":
app.run(debug=True, host="127.0.0.1", port=39001)

114
tg_bot.py
View File

@@ -35,10 +35,8 @@ from telegram.ext import (
)
# Local modules
from modules.register import OpenAIRegistrar
from modules.tempmail import TempMailClient
from modules.billing import EUBillingGenerator
from modules.iban_generator import GermanIbanGenerator
from flow import CompleteRegistrationFlow
from config import TEMPMAIL_CONFIG, DEBUG
# Bot configuration
@@ -413,7 +411,7 @@ async def button_callback(update: Update, context: ContextTypes.DEFAULT_TYPE):
async def perform_registration(message, count: int, generate_billing: bool, add_payment: bool = False, payment_info: dict = None):
"""执行注册流程"""
"""执行注册流程(使用 CompleteRegistrationFlow"""
# 发送开始消息
status_text = (
f"🔄 开始注册 {count} 个账号...\n"
@@ -421,6 +419,7 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
)
if add_payment:
status_text += "✅ 将添加支付方式\n"
status_text += "✅ 将保存到 Notion\n"
status_text += "\n⏳ 请稍候..."
status_msg = await message.reply_text(status_text)
@@ -451,29 +450,14 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
await status_msg.edit_text(f"❌ 初始化失败: {str(e)}")
return
# 如果需要支付,初始化 IBAN 生成
iban_generator = None
if add_payment:
# 创建完整流程管理
try:
await status_msg.edit_text(
f"🔄 开始注册 {count} 个账号...\n"
f"{'✅ 将生成账单 URL' if generate_billing else '❌ 不生成账单'}\n"
f"✅ 将添加支付方式\n\n"
f"⚙️ 正在初始化 IBAN 生成器..."
)
iban_generator = GermanIbanGenerator()
await status_msg.edit_text(
f"🔄 开始注册 {count} 个账号...\n"
f"{'✅ 将生成账单 URL' if generate_billing else '❌ 不生成账单'}\n"
f"✅ 将添加支付方式\n\n"
f"✅ IBAN 生成器已就绪"
)
flow = CompleteRegistrationFlow(tempmail_client)
if DEBUG:
print(f"✅ CompleteRegistrationFlow 初始化成功")
except Exception as e:
await status_msg.edit_text(
f"⚠️ IBAN 生成器初始化失败: {str(e)}\n\n"
f"将继续注册但不添加支付方式"
)
add_payment = False
await status_msg.edit_text(f"❌ 流程管理器初始化失败: {str(e)}")
return
# 注册账号
success_accounts = []
@@ -495,18 +479,20 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
# 生成密码
password = generate_random_password()
# 生成 IBAN (如果需要)
iban = None
if add_payment and iban_generator:
iban = iban_generator.generate(1)[0]
if DEBUG:
print(f"🔢 Generated IBAN: {iban}")
# 创建注册器
registrar = OpenAIRegistrar(tempmail_client=tempmail_client)
# 执行注册
result = registrar.register_with_auto_email(password)
# 根据模式执行不同的注册流程
if add_payment:
# 完整流程:注册 + 账单 + 支付 + Notion
result = flow.register_with_payment(
password=password,
payment_info=payment_info,
save_to_notion=True
)
elif generate_billing:
# 账单流程:注册 + 账单
result = flow.register_with_billing(password=password)
else:
# 基础流程:仅注册
result = flow.register_basic(password=password)
if not result.get('success'):
failed_accounts.append({
@@ -515,53 +501,7 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
})
continue
email = result['email']
account_info = {
'email': email,
'password': password,
'verified': result.get('verified', False)
}
# 如果需要生成账单
if generate_billing:
try:
access_token = registrar._step5_get_access_token()
billing_gen = EUBillingGenerator()
billing_result = billing_gen.generate_checkout_url(access_token)
if billing_result.success:
account_info['access_token'] = access_token
account_info['checkout_url'] = billing_result.checkout_url
# 如果需要添加支付方式
if add_payment and payment_info and iban:
try:
payment_result = registrar.add_payment_method(
checkout_session_url=billing_result.checkout_url,
iban=iban,
name=payment_info.get('name', 'John Doe'),
email=email,
address_line1=payment_info.get('address_line1', '123 Main Street'),
city=payment_info.get('city', 'New York'),
postal_code=payment_info.get('postal_code', '10001'),
state=payment_info.get('state', 'NY'),
country=payment_info.get('country', 'US')
)
if payment_result.get('success'):
account_info['payment_added'] = True
account_info['iban'] = iban
else:
account_info['payment_error'] = payment_result.get('error', 'Unknown error')
except Exception as e:
account_info['payment_error'] = str(e)
else:
account_info['billing_error'] = billing_result.error
except Exception as e:
account_info['billing_error'] = str(e)
success_accounts.append(account_info)
success_accounts.append(result)
except Exception as e:
failed_accounts.append({
@@ -608,6 +548,12 @@ async def perform_registration(message, count: int, generate_billing: bool, add_
elif 'payment_error' in acc:
account_text += f"\n⚠️ 支付: {acc['payment_error']}\n"
# Notion 保存状态
if 'notion_saved' in acc and acc['notion_saved']:
account_text += f"\n📝 **Notion**\n✅ 已保存到数据库\n"
elif 'notion_error' in acc:
account_text += f"\n⚠️ Notion: {acc['notion_error']}\n"
account_text += "\n━━━━━━━━━━━━━━━━━━━"
await message.reply_text(account_text, parse_mode='Markdown')