feat(payment): Integrate Stripe API and refactor payment flow

- Add new stripe_api.py module with StripePaymentAPI class and payment retry logic
- Import Stripe payment module in auto_gpt_team.py with graceful fallback handling
- Refactor run_payment_flow() to extract form filling logic into _fill_payment_form()
- Simplify error handling by returning structured tuples (success, error_type, error_msg)
- Remove redundant comments and streamline selector logic for payment form elements
- Improve code maintainability by separating concerns between form filling and flow orchestration
- Add STRIPE_API_AVAILABLE flag to track payment module availability at runtime
This commit is contained in:
2026-01-30 09:57:55 +08:00
parent ad03fab8e9
commit 11395bf1ba
3 changed files with 992 additions and 477 deletions

View File

@@ -40,6 +40,19 @@ except ImportError:
api_login_flow = None
ShutdownRequested = Exception # 回退到基础异常类
# 导入 Stripe API 支付模块
try:
from stripe_api import (
StripePaymentAPI,
api_payment_with_retry,
is_stripe_api_available,
)
STRIPE_API_AVAILABLE = is_stripe_api_available()
except ImportError:
STRIPE_API_AVAILABLE = False
StripePaymentAPI = None
api_payment_with_retry = None
# ================= 配置加载 =================
try:
import tomllib
@@ -869,110 +882,37 @@ def _is_connection_lost(error_msg):
return any(kw in error_lower for kw in disconnect_keywords)
def run_payment_flow(page, email, step_callback=None):
"""执行 SEPA 支付流程 - 严格按顺序执行,任一步骤失败则终止
Args:
page: 浏览器页面对象
email: 邮箱地址
step_callback: 步骤回调函数 (step: str)
def _fill_payment_form(page, email, sepa_iban, street, postal_code, city, account_name, step_callback=None):
"""填写支付表单(内部函数,供重试使用)
Returns:
dict: 成功时返回 {"token": ..., "account_id": ...}
被停止时返回 {"stopped": True}
失败时返回 None
tuple: (success: bool, error_type: str, error_msg: str)
error_type: "stopped", "iban_error", "fatal", "success"
"""
def step_cb(step):
if step_callback:
step_callback(step)
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求,中断支付流程")
return {"stopped": True}
log_status("支付流程", "开始处理 Stripe 支付页...")
try:
# 等待支付页加载完成
step_cb("等待支付页加载...")
log_status("支付页", "等待支付页加载...")
# 等待页面稳定Stripe 支付页需要时间渲染)
time.sleep(3)
# 尝试多种方式定位邮箱输入框
email_input = None
for attempt in range(3):
# 方式1: 直接 ID
email_input = page.ele('#email', timeout=5)
if email_input:
break
# 方式2: name 属性
email_input = page.ele('@name=email', timeout=3)
if email_input:
break
# 方式3: CSS 选择器
email_input = page.ele('css:input[type="email"], input[id*="email"], input[name*="email"]', timeout=3)
if email_input:
break
log_progress(f"[尝试{attempt+1}] 等待邮箱输入框...")
time.sleep(2)
if email_input:
log_progress("[OK] 支付页已加载")
else:
log_progress("[!] 邮箱输入框未立即出现,继续尝试...")
time.sleep(3)
# 随机选择 IBAN 和地址
ibans = get_sepa_ibans()
if not ibans:
log_progress("[X] 没有可用的 IBAN请先通过 Bot 导入")
return None
sepa_iban = random.choice(ibans)
street, postal_code, city = random.choice(SEPA_ADDRESSES)
account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
log_progress(f"使用 IBAN: {sepa_iban[:8]}...")
log_progress(f"使用地址: {street}, {postal_code} {city}")
log_progress(f"账户名: {account_name}")
# ========== 步骤 1: 填写邮箱 ==========
step_cb("填写支付邮箱...")
log_progress("[1] 填写邮箱...")
try:
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求")
return {"stopped": True}
# 如果之前已经找到了,直接使用;否则重新查找
if not email_input:
if _is_shutdown_requested():
return False, "stopped", "用户停止"
email_input = page.ele('#email', timeout=10)
if not email_input:
email_input = page.ele('@name=email', timeout=5)
if not email_input:
email_input = page.ele('css:input[type="email"]', timeout=5)
if not email_input:
log_progress("[X] 邮箱输入框未找到")
log_progress(f"当前URL: {page.url}")
return None
return False, "fatal", "邮箱输入框未找到"
email_input.clear()
email_input.input(email)
log_progress(f"[OK] 已填写邮箱: {email}")
time.sleep(1)
except Exception as e:
error_msg = str(e)
if _is_connection_lost(error_msg):
log_progress("[!] 浏览器连接断开")
return {"stopped": True}
log_progress(f"[X] 邮箱填写失败: {e}")
log_progress(f"当前URL: {page.url}")
return None
# ========== 步骤 2: 选择 SEPA ==========
step_cb("选择 SEPA 支付方式...")
@@ -980,10 +920,9 @@ def run_payment_flow(page, email, step_callback=None):
time.sleep(1)
sepa_clicked = False
# 定位方式(按速度排序:属性选择器 > CSS > xpath
sepa_selectors = [
'@data-testid=sepa_debit-accordion-item-button', # 最快:属性选择器
'css:button[data-testid*="sepa"]', # 快CSS 模糊匹配
'@data-testid=sepa_debit-accordion-item-button',
'css:button[data-testid*="sepa"]',
]
for selector in sepa_selectors:
try:
@@ -997,7 +936,6 @@ def run_payment_flow(page, email, step_callback=None):
continue
if not sepa_clicked:
# 最后尝试 JS
try:
result = page.run_js('''
const btns = document.querySelectorAll('button');
@@ -1016,18 +954,15 @@ def run_payment_flow(page, email, step_callback=None):
pass
if not sepa_clicked:
log_progress("[X] SEPA 选择失败")
return None
return False, "fatal", "SEPA 选择失败"
# 验证 SEPA 是否真正展开(检查 IBAN 输入框是否出现)
# 验证 SEPA 是否展开
try:
iban_check = page.ele('#iban', timeout=5)
if not iban_check:
log_progress("[X] SEPA 未展开")
return None
return False, "fatal", "SEPA 未展开"
except:
log_progress("[X] SEPA 未展开")
return None
return False, "fatal", "SEPA 未展开"
# ========== 步骤 3: 填写 IBAN ==========
step_cb("填写 IBAN...")
@@ -1035,12 +970,10 @@ def run_payment_flow(page, email, step_callback=None):
try:
iban_input = page.ele('#iban', timeout=5)
if not iban_input:
log_progress("[X] IBAN 输入框未找到")
return None
return False, "fatal", "IBAN 输入框未找到"
iban_input.input(sepa_iban)
except Exception as e:
log_progress(f"[X] IBAN 填写失败: {e}")
return None
return False, "fatal", f"IBAN 填写失败: {e}"
# ========== 步骤 4: 填写账户姓名 ==========
step_cb("填写账户姓名...")
@@ -1096,14 +1029,12 @@ def run_payment_flow(page, email, step_callback=None):
log_progress("[7] 点击订阅...")
time.sleep(1)
# 多种方式尝试点击订阅按钮Linux 无头模式下需要更可靠的方式)
subscribe_clicked = False
# 方式1: 使用 DrissionPage click
# 方式1: DrissionPage click
try:
subscribe_btn = page.ele('css:button[type="submit"]', timeout=5)
if subscribe_btn:
# 确保按钮可见并滚动到视图
page.run_js('arguments[0].scrollIntoView({block: "center"})', subscribe_btn)
time.sleep(0.5)
subscribe_btn.click()
@@ -1112,7 +1043,7 @@ def run_payment_flow(page, email, step_callback=None):
except Exception as e:
log_progress(f"[!] 方式1点击失败: {e}")
# 方式2: 使用 JS 点击
# 方式2: JS click
if not subscribe_clicked:
try:
result = page.run_js('''
@@ -1127,12 +1058,10 @@ def run_payment_flow(page, email, step_callback=None):
if result == "clicked":
subscribe_clicked = True
log_progress("[OK] 点击订阅按钮 (方式2-JS)")
else:
log_progress(f"[!] 方式2: {result}")
except Exception as e:
log_progress(f"[!] 方式2点击失败: {e}")
# 方式3: 使用 JS dispatchEvent 模拟点击
# 方式3: dispatchEvent
if not subscribe_clicked:
try:
result = page.run_js('''
@@ -1154,20 +1083,113 @@ def run_payment_flow(page, email, step_callback=None):
except Exception as e:
log_progress(f"[!] 方式3点击失败: {e}")
# 记录当前 URL 用于调试
return True, "success", ""
except Exception as e:
error_msg = str(e)
if _is_connection_lost(error_msg):
return False, "stopped", "浏览器连接断开"
return False, "fatal", str(e)
def run_payment_flow(page, email, step_callback=None, max_retries: int = 3):
"""执行 SEPA 支付流程 - 支持 IBAN 重试
Args:
page: 浏览器页面对象
email: 邮箱地址
step_callback: 步骤回调函数 (step: str)
max_retries: IBAN 重试次数
Returns:
dict: 成功时返回 {"token": ..., "account_id": ...}
被停止时返回 {"stopped": True}
失败时返回 None
"""
def step_cb(step):
if step_callback:
step_callback(step)
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求,中断支付流程")
return {"stopped": True}
log_status("支付流程", "开始处理 Stripe 支付页...")
try:
log_progress(f"点击后URL: {page.url}")
# 等待支付页加载完成
step_cb("等待支付页加载...")
log_status("支付页", "等待支付页加载...")
time.sleep(3)
# 检查 IBAN 列表
ibans = get_sepa_ibans()
if not ibans:
log_progress("[X] 没有可用的 IBAN请先通过 Bot 导入")
return None
# 用于跟踪已使用的 IBAN避免重复
used_ibans = set()
last_error = ""
for retry in range(max_retries):
if _is_shutdown_requested():
return {"stopped": True}
# 选择一个未使用过的 IBAN
available_ibans = [iban for iban in ibans if iban not in used_ibans]
if not available_ibans:
# 所有 IBAN 都用过了,重置
available_ibans = ibans
used_ibans.clear()
sepa_iban = random.choice(available_ibans)
used_ibans.add(sepa_iban)
street, postal_code, city = random.choice(SEPA_ADDRESSES)
account_name = f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
if retry == 0:
log_progress(f"使用 IBAN: {sepa_iban[:8]}...")
log_progress(f"使用地址: {street}, {postal_code} {city}")
log_progress(f"账户名: {account_name}")
else:
log_progress(f"[!] 重试 {retry}/{max_retries-1},更换 IBAN: {sepa_iban[:8]}...")
# 如果是重试,需要刷新页面重新填写
if retry > 0:
log_progress("[!] 刷新页面重新填写...")
try:
page.refresh()
time.sleep(3)
except:
pass
# ========== 步骤 8: 等待支付成功 ==========
# 填写表单
success, error_type, error_msg = _fill_payment_form(
page, email, sepa_iban, street, postal_code, city, account_name, step_callback
)
if error_type == "stopped":
return {"stopped": True}
if not success:
last_error = error_msg
if error_type == "fatal":
# 致命错误,不重试
log_progress(f"[X] 致命错误: {error_msg}")
return None
# IBAN 相关错误,继续重试
log_progress(f"[!] 表单填写失败: {error_msg}")
continue
# 表单填写成功,等待支付结果
step_cb("等待支付处理...")
log_status("等待", "等待支付处理(超时90秒)...")
# 使用轮询方式等待支付成功
payment_success = False
start_time = time.time()
max_wait = 90 # 最多等待 90 秒
max_wait = 90
last_log_time = 0
last_url = ""
@@ -1175,79 +1197,57 @@ def run_payment_flow(page, email, step_callback=None):
try:
current_url = page.url
# URL 变化时输出日志
if current_url != last_url:
log_progress(f"[URL变化] {current_url[:80]}...")
last_url = current_url
# 检查是否支付成功
if 'payments/success' in current_url or 'success-team' in current_url:
payment_success = True
log_status("成功", "[OK] 支付成功!")
break
# 检查是否有错误页面
if 'error' in current_url.lower() or 'failed' in current_url.lower():
log_status("失败", f"[X] 支付失败: {current_url}")
return None
last_error = "支付页面显示错误"
break
# 检查页面上是否有错误提示Stripe 可能显示错误但不改变 URL
# 检查页面错误提示
try:
error_elem = page.ele('css:[class*="error"], [class*="Error"], [role="alert"]', timeout=0.5)
if error_elem and error_elem.text:
error_text = error_elem.text[:100]
if error_text and 'error' in error_text.lower():
if error_text and ('error' in error_text.lower() or 'invalid' in error_text.lower() or 'unusable' in error_text.lower()):
log_progress(f"[!] 页面错误: {error_text}")
# IBAN 相关错误,标记重试
if 'iban' in error_text.lower() or 'bank' in error_text.lower() or 'unusable' in error_text.lower():
last_error = f"IBAN 错误: {error_text}"
break
except:
pass
except Exception as e:
# 页面加载中可能会抛出异常,记录但继续等待
log_progress(f"[!] 页面访问异常: {str(e)[:50]}")
# 检查停止请求
if _is_shutdown_requested():
log_progress("[!] 检测到停止请求")
return {"stopped": True}
# 每 10 秒输出一次日志
elapsed = int(time.time() - start_time)
if elapsed >= last_log_time + 10:
log_progress(f"[等待中] 已等待 {elapsed} 秒...")
last_log_time = elapsed
# 每 30 秒检查一次页面状态
if elapsed % 30 == 0:
try:
# 检查是否还在支付页
if 'pay.openai.com' in current_url or 'checkout.stripe.com' in current_url:
log_progress("[!] 仍在支付页,可能卡住了")
# 尝试再次点击提交按钮
try:
log_progress("[!] 仍在支付页,尝试重新点击提交")
page.run_js('document.querySelector("button[type=submit]")?.click()')
log_progress("[!] 尝试重新点击提交按钮")
except:
pass
except:
pass
time.sleep(2)
if not payment_success:
log_status("超时", "[X] 支付未完成(超时)")
try:
log_progress(f"最终URL: {page.url}")
# 尝试获取页面截图信息(调试用)
try:
page_title = page.run_js('return document.title')
log_progress(f"页面标题: {page_title}")
except:
pass
except:
pass
return None
# ========== 步骤 9: 获取 token 和 account_id ==========
if payment_success:
# 支付成功,获取 token
step_cb("获取 Token...")
log_status("获取", "正在获取 access token...")
time.sleep(1)
@@ -1255,7 +1255,6 @@ def run_payment_flow(page, email, step_callback=None):
time.sleep(1)
try:
# 获取页面内容JSON
session_text = page.ele('tag:pre', timeout=5).text
import json
session_data = json.loads(session_text)
@@ -1264,11 +1263,8 @@ def run_payment_flow(page, email, step_callback=None):
if access_token:
log_status("成功", f"Token: {access_token[:50]}...")
# 优先从 session 数据直接提取 account_id最快
step_cb("获取 Account ID...")
account_id = fetch_account_id_from_session(session_data)
# 如果 session 中没有,再通过 API 获取
if not account_id:
account_id = fetch_account_id(page, access_token)
@@ -1286,9 +1282,17 @@ def run_payment_flow(page, email, step_callback=None):
log_progress(f"[X] 获取 token 失败: {e}")
return None
# 支付未成功,检查是否需要重试
if retry < max_retries - 1:
log_progress(f"[!] 支付未成功: {last_error},准备重试...")
continue
# 所有重试都失败
log_status("超时", f"[X] 支付失败(已重试 {max_retries} 次): {last_error}")
return None
except Exception as e:
error_msg = str(e)
# 只有连接断开才认为是停止请求,普通异常按错误处理
if _is_connection_lost(error_msg):
log_status("停止", "[!] 浏览器连接断开,支付流程已中断")
return {"stopped": True}
@@ -1296,15 +1300,15 @@ def run_payment_flow(page, email, step_callback=None):
return None
def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool = True, step_callback=None):
"""使用 API 会话的 cookies 注入浏览器完成支付 (协议模式专用)
def api_pay_with_session(reg, email: str, proxy: str = None, step_callback=None, max_retries: int = 3):
"""使用 API 完成支付流程(带 IBAN 重试)
Args:
reg: ChatGPTAPIRegister 对象
email: 邮箱
proxy: 代理地址
headless: 是否无头模式
step_callback: 步骤回调
max_retries: IBAN 重试次数
Returns:
dict: {"token": ..., "account_id": ...} 或 None
@@ -1313,6 +1317,10 @@ def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool
if step_callback:
step_callback(step)
if not STRIPE_API_AVAILABLE:
log_status("警告", "Stripe API 模式不可用,回退到浏览器模式")
return None
step_cb("获取支付页 URL...")
# 通过 API 获取支付页 URL
@@ -1323,154 +1331,121 @@ def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool
log_progress(f"[OK] 支付页: {checkout_url[:60]}...")
# 获取 cookies
cookies = reg.get_cookies()
log_status("Cookie", f"获取到 {len(cookies)} 个 cookies")
# 使用 API 支付(带 IBAN 重试)
step_cb("API 支付处理中...")
# 启动浏览器
step_cb("启动浏览器...")
temp_user_data = tempfile.mkdtemp(prefix="chrome_api_")
def get_iban():
ibans = get_sepa_ibans()
return random.choice(ibans) if ibans else ""
# 检测操作系统
is_linux = platform.system() == "Linux"
def get_address():
return random.choice(SEPA_ADDRESSES)
# 获取随机指纹
fingerprint = None
if RANDOM_FINGERPRINT:
fingerprint = get_random_fingerprint()
else:
fingerprint = {
"user_agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/144.0.0.0 Safari/537.36",
"platform": "Win32",
"webgl_vendor": "Google Inc. (NVIDIA)",
"webgl_renderer": "ANGLE (NVIDIA, NVIDIA GeForce RTX 3060 Direct3D11 vs_5_0 ps_5_0)",
"screen": {"width": 1920, "height": 1080}
def get_name():
return f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
success, error = api_payment_with_retry(
checkout_url=checkout_url,
email=email,
session=reg.session if hasattr(reg, 'session') else None,
proxy=proxy,
max_retries=max_retries,
get_iban_func=get_iban,
get_address_func=get_address,
get_name_func=get_name,
progress_callback=lambda msg: log_progress(msg)
)
if not success:
log_status("失败", f"API 支付失败: {error}")
return None
# 支付成功,获取 token 和 account_id
step_cb("获取 Token...")
log_status("获取", "正在获取 access token...")
token = reg.get_session_token()
if not token:
log_status("警告", "支付成功但无法获取 token")
return None
log_status("成功", f"Token: {token[:50]}...")
# 获取 account_id
step_cb("获取 Account ID...")
account_id = ""
try:
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}
resp = requests.get(
"https://chatgpt.com/backend-api/accounts/check/v4-2023-04-27",
headers=headers,
timeout=10
)
if resp.status_code == 200:
data = resp.json()
accounts = data.get("accounts", {})
for acc_id, acc_info in accounts.items():
if acc_id == "default":
continue
account_data = acc_info.get("account", {})
plan_type = account_data.get("plan_type", "")
if "team" in plan_type.lower():
account_id = acc_id
break
if not account_id:
for acc_id in accounts.keys():
if acc_id != "default":
account_id = acc_id
break
except Exception as e:
log_progress(f"获取 account_id 失败: {e}")
if account_id:
log_progress(f"account_id: {account_id[:8]}...")
return {
"token": token,
"account_id": account_id
}
co = ChromiumOptions()
co.set_argument('--no-first-run')
co.set_argument('--no-default-browser-check')
co.set_argument(f'--user-data-dir={temp_user_data}')
co.set_argument('--disable-blink-features=AutomationControlled')
co.set_argument('--disable-infobars')
co.set_argument('--disable-dev-shm-usage')
co.set_argument('--no-sandbox')
co.set_argument('--disable-gpu') # Linux 无头模式必需
co.set_argument('--incognito') # 无痕模式
co.set_argument('--lang=en-US') # 设置语言为英文Stripe 页面)
co.set_argument(f'--user-agent={fingerprint["user_agent"]}')
screen = fingerprint.get("screen", {"width": 1920, "height": 1080})
def browser_pay_with_cookies(reg, email: str, proxy: str = None, headless: bool = True,
step_callback=None, max_retries: int = 3):
"""使用 API 完成支付流程(纯 API 模式,不使用浏览器)
if headless:
co.set_argument('--headless=new')
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
else:
co.set_argument(f'--window-size={screen["width"]},{screen["height"]}')
Args:
reg: ChatGPTAPIRegister 对象
email: 邮箱
proxy: 代理地址
headless: 已废弃,保留兼容性
step_callback: 步骤回调
max_retries: IBAN 重试次数
if proxy:
co.set_argument(f'--proxy-server={proxy}')
Returns:
dict: {"token": ..., "account_id": ...}
支付失败返回 {"payment_failed": True}
用户停止返回 {"stopped": True}
"""
def step_cb(step):
if step_callback:
step_callback(step)
if is_linux:
co.set_argument('--disable-software-rasterizer')
co.set_argument('--disable-extensions')
co.set_argument('--disable-setuid-sandbox')
# 注意:不使用 --single-process可能导致 Stripe 页面异常
co.set_argument('--remote-debugging-port=0')
co.set_argument('--disable-background-networking')
co.set_argument('--disable-default-apps')
co.set_argument('--disable-sync')
co.set_argument('--metrics-recording-only')
co.set_argument('--mute-audio')
co.set_argument('--no-zygote') # 替代 single-process更稳定
chrome_paths = [
'/usr/bin/google-chrome',
'/usr/bin/google-chrome-stable',
'/usr/bin/chromium-browser',
'/usr/bin/chromium',
'/snap/bin/chromium',
]
for chrome_path in chrome_paths:
if os.path.exists(chrome_path):
co.set_browser_path(chrome_path)
log_status("浏览器", f"使用: {chrome_path}")
break
else:
co.auto_port(True)
co.set_local_port(random.randint(19222, 29999))
log_status("浏览器", f"正在启动 ({'无头' if headless else '有头'}模式)...")
page = ChromiumPage(co)
try:
# 注入指纹
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
# 先访问 chatgpt.com 注入 cookies
step_cb("注入登录状态...")
log_status("Cookie", "注入登录状态...")
page.get("https://chatgpt.com")
injected_count = 0
for cookie in cookies:
try:
if 'chatgpt.com' in cookie.get('domain', ''):
page.set.cookies({
'name': cookie['name'],
'value': cookie['value'],
'domain': cookie['domain'].lstrip('.'),
'path': cookie.get('path', '/'),
})
injected_count += 1
except:
pass
log_progress(f"[OK] 已注入 {injected_count} 个 cookies")
# 刷新页面确保 cookies 生效
time.sleep(1)
page.refresh()
time.sleep(1)
# 直接跳转到支付页
step_cb("跳转到支付页...")
log_status("订阅", "跳转到支付页...")
page.get(checkout_url)
try:
page.wait.url_change('pay.openai.com', timeout=15)
log_progress("[OK] 已跳转到支付页")
except:
time.sleep(2)
# 重新注入指纹(跳转到新域名后需要重新注入)
if RANDOM_FINGERPRINT:
inject_fingerprint(page, fingerprint)
log_progress("[OK] 已重新注入指纹到支付页")
# 等待页面完全加载
time.sleep(2)
# 执行支付流程
step_cb("执行 SEPA 支付...")
result = run_payment_flow(page, email, step_cb)
# 使用 API 支付模式
if STRIPE_API_AVAILABLE:
log_status("支付", "使用 API 支付模式...")
result = api_pay_with_session(reg, email, proxy, step_callback, max_retries=max_retries)
if result:
return result
except Exception as e:
log_status("错误", f"浏览器流程异常: {e}")
return None
finally:
try:
page.quit()
except:
pass
# 清理临时目录
try:
import shutil
shutil.rmtree(temp_user_data, ignore_errors=True)
except:
pass
# API 支付失败,返回特殊标记让调用方重新创建用户
log_status("失败", "API 支付失败,需要重新创建用户")
return {"payment_failed": True}
else:
log_status("错误", "Stripe API 模式不可用,请安装 curl_cffi")
return {"payment_failed": True}
def fetch_account_id(page, access_token: str) -> str:
@@ -2289,13 +2264,16 @@ def run_single_registration(progress_callback=None, step_callback=None) -> dict:
cleanup_chrome_processes()
def run_single_registration_api(progress_callback=None, step_callback=None, proxy: str = None) -> dict:
"""执行单次注册流程 - 协议模式 (API + Cookie 注入浏览器)
def run_single_registration_api(progress_callback=None, step_callback=None, proxy: str = None, max_user_retries: int = 3) -> dict:
"""执行单次注册流程 - 协议模式 (API + API 支付)
支付失败时会自动重新创建用户重试,最多重试 max_user_retries 次
Args:
progress_callback: 进度回调函数 (message: str)
step_callback: 步骤回调函数 (step: str)
proxy: 代理地址
max_user_retries: 支付失败后重新创建用户的最大次数
Returns:
dict: {"success": bool, "account": str, "password": str, "token": str, "account_id": str, "error": str}
@@ -2328,6 +2306,20 @@ def run_single_registration_api(progress_callback=None, step_callback=None, prox
if not ibans:
return {"success": False, "error": "没有可用的 IBAN请先通过 /iban_add 导入"}
# 使用配置的代理或传入的代理
use_proxy = proxy or API_PROXY or None
if use_proxy:
log_status("代理", f"使用代理: {use_proxy}")
last_error = ""
for user_retry in range(max_user_retries):
if _is_shutdown_requested():
return {"success": False, "error": "用户停止", "stopped": True}
if user_retry > 0:
log_status("重试", f"========== 重新创建用户 (第 {user_retry + 1}/{max_user_retries} 次) ==========")
step_cb("生成账号信息...")
# 生成账号信息
@@ -2355,12 +2347,7 @@ def run_single_registration_api(progress_callback=None, step_callback=None, prox
log_status("初始化", f"生成账号: {email}")
log_status("初始化", f"设置密码: {password}")
log_status("初始化", f"姓名: {real_name} | 生日: {birthdate}")
log_status("模式", "协议模式 (API + Cookie 注入)")
# 使用配置的代理或传入的代理
use_proxy = proxy or API_PROXY or None
if use_proxy:
log_status("代理", f"使用代理: {use_proxy}")
log_status("模式", "协议模式 (API)")
try:
# 阶段 1: API 注册
@@ -2380,55 +2367,26 @@ def run_single_registration_api(progress_callback=None, step_callback=None, prox
if not reg:
log_status("失败", "API 注册失败")
return {"success": False, "error": "API 注册失败", "account": email, "password": password}
last_error = "API 注册失败"
continue # 重新创建用户
log_status("完成", "[OK] API 注册成功!")
# 阶段 2: Cookie 注入浏览器 + 支付
step_cb("Cookie 注入浏览器...")
log_status("阶段 2", "========== Cookie 注入浏览器 + 订阅支付 ==========")
# 阶段 2: API 支付
step_cb("API 支付...")
log_status("阶段 2", "========== API 支付 ==========")
result = browser_pay_with_cookies(
reg=reg,
email=email,
proxy=use_proxy,
headless=True,
step_callback=step_cb
step_callback=step_cb,
max_retries=3 # IBAN 重试 3 次
)
if result and result.get("stopped"):
log_status("停止", "[!] 注册被用户停止")
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
elif result and result.get("token"):
step_cb("注册成功!")
log_status("完成", "[OK] 全部流程完成!")
return {
"success": True,
"account": email,
"password": password,
"token": result["token"],
"account_id": result.get("account_id", "")
}
# 如果 Cookie 注入失败,尝试 API 登录方式
log_status("重试", "Cookie 注入失败,尝试 API 登录...")
step_cb("尝试 API 登录...")
reg2 = api_login_flow(
email=email,
password=password,
proxy=use_proxy,
progress_callback=log_cb
)
if reg2:
result = browser_pay_with_cookies(
reg=reg2,
email=email,
proxy=use_proxy,
headless=True,
step_callback=step_cb
)
if result and result.get("token"):
step_cb("注册成功!")
@@ -2441,23 +2399,34 @@ def run_single_registration_api(progress_callback=None, step_callback=None, prox
"account_id": result.get("account_id", "")
}
if result and result.get("payment_failed"):
log_status("失败", "支付失败,准备重新创建用户...")
last_error = "支付流程失败"
continue # 重新创建用户
# 其他失败情况
log_status("失败", "注册成功但支付/获取token失败")
return {"success": False, "error": "支付流程失败", "account": email, "password": password}
last_error = "支付流程失败"
continue # 重新创建用户
except ShutdownRequested:
log_status("停止", "[!] 用户请求停止")
return {"success": False, "error": "用户停止", "stopped": True, "account": email, "password": password}
except Exception as e:
error_msg = str(e)
# 只有连接断开才认为是停止请求
if _is_connection_lost(error_msg):
log_status("停止", "[!] 浏览器连接断开")
return {"success": False, "error": "浏览器连接断开", "stopped": True, "account": email, "password": password}
log_status("停止", "[!] 连接断开")
return {"success": False, "error": "连接断开", "stopped": True, "account": email, "password": password}
log_status("错误", f"注册异常: {e}")
return {"success": False, "error": str(e), "account": email, "password": password}
last_error = str(e)
continue # 重新创建用户
finally:
cleanup_chrome_processes()
# 所有重试都失败
log_status("失败", f"已重试 {max_user_retries} 次,全部失败")
return {"success": False, "error": f"重试 {max_user_retries} 次后仍失败: {last_error}"}
def run_single_registration_auto(progress_callback=None, step_callback=None, mode: str = None) -> dict:
"""自动选择模式执行注册

418
stripe_api.py Normal file
View File

@@ -0,0 +1,418 @@
"""
Stripe SEPA 支付 API 模块
- 使用纯 API 方式完成 Stripe SEPA 支付
- 参考 team-reg-go/stripe/stripe.go 实现
"""
import re
import time
import uuid
import random
try:
from curl_cffi import requests as curl_requests
CURL_CFFI_AVAILABLE = True
except ImportError:
CURL_CFFI_AVAILABLE = False
curl_requests = None
import requests
def log_status(step, message):
"""日志输出"""
timestamp = time.strftime("%H:%M:%S")
print(f"[{timestamp}] [{step}] {message}")
def log_progress(message):
"""进度输出"""
print(f" {message}")
# Stripe 配置常量
STRIPE_VERSION = "2024-12-18.acacia"
STRIPE_JS_VERSION = "d7c0c7e0e4e3e8e9e0e1e2e3e4e5e6e7"
OPENAI_STRIPE_PUBLIC_KEY = "pk_live_51LVfqJKa3k6aumIYFMNZILdA00QBnLMNa"
def generate_stripe_fingerprint() -> str:
"""生成 Stripe 指纹 ID"""
return uuid.uuid4().hex
def extract_session_id(checkout_url: str) -> str:
"""从 checkout URL 提取 session_id"""
match = re.search(r'(cs_(?:live|test)_[a-zA-Z0-9]+)', checkout_url)
if match:
return match.group(1)
return ""
class StripePaymentAPI:
"""Stripe SEPA 支付 API 处理器"""
def __init__(self, checkout_url: str, session=None, proxy: str = None):
"""初始化
Args:
checkout_url: Stripe checkout URL
session: 可选的 curl_cffi session (复用已有会话)
proxy: 代理地址
"""
self.checkout_url = checkout_url
self.session_id = extract_session_id(checkout_url)
self.stripe_public_key = OPENAI_STRIPE_PUBLIC_KEY
self.init_checksum = ""
self.js_checksum = ""
self.guid = generate_stripe_fingerprint()
self.muid = generate_stripe_fingerprint()
self.sid = generate_stripe_fingerprint()
self.client_session_id = ""
self.checkout_config_id = ""
# 创建或复用 session
if session:
self.session = session
elif CURL_CFFI_AVAILABLE:
self.session = curl_requests.Session(
impersonate="chrome",
verify=False,
proxies={"http": proxy, "https": proxy} if proxy else {}
)
else:
self.session = requests.Session()
if proxy:
self.session.proxies = {"http": proxy, "https": proxy}
def _get_stripe_headers(self) -> dict:
"""获取 Stripe API 请求头"""
return {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
"Origin": "https://pay.openai.com",
"Referer": "https://pay.openai.com/",
"Sec-Fetch-Site": "cross-site",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Dest": "empty",
"Sec-Ch-Ua-Platform": '"Windows"',
"Accept-Language": "en-US,en;q=0.9",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
def fetch_checkout_page(self) -> bool:
"""获取 checkout 页面参数"""
try:
headers = {
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/131.0.0.0 Safari/537.36"
}
resp = self.session.get(self.checkout_url, headers=headers, timeout=30)
if resp.status_code == 200:
html = resp.text
# 提取 initChecksum
init_match = re.search(r'"initChecksum":\s*"([^"]+)"', html)
if init_match:
self.init_checksum = init_match.group(1)
# 提取 jsChecksum
js_match = re.search(r'"jsChecksum":\s*"([^"]+)"', html)
if js_match:
self.js_checksum = js_match.group(1)
return True
return False
except Exception as e:
log_progress(f"[!] 获取 checkout 页面失败: {e}")
return False
def create_payment_method(self, iban: str, name: str, email: str,
address: str, city: str, postal_code: str,
country: str = "DE") -> str:
"""Step 1: 创建支付方式
Returns:
str: payment_method_id失败返回空字符串
"""
api_url = "https://api.stripe.com/v1/payment_methods"
self.client_session_id = str(uuid.uuid4())
self.checkout_config_id = str(uuid.uuid4())
data = {
"type": "sepa_debit",
"sepa_debit[iban]": iban,
"billing_details[name]": name,
"billing_details[email]": email,
"billing_details[address][country]": country,
"billing_details[address][line1]": address,
"billing_details[address][city]": city,
"billing_details[address][postal_code]": postal_code,
"guid": self.guid,
"muid": self.muid,
"sid": self.sid,
"_stripe_version": STRIPE_VERSION,
"key": self.stripe_public_key,
"payment_user_agent": f"stripe.js/{STRIPE_JS_VERSION}; stripe-js-v3/{STRIPE_JS_VERSION}; checkout",
"client_attribution_metadata[client_session_id]": self.client_session_id,
"client_attribution_metadata[checkout_session_id]": self.session_id,
"client_attribution_metadata[merchant_integration_source]": "checkout",
"client_attribution_metadata[merchant_integration_version]": "hosted_checkout",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[checkout_config_id]": self.checkout_config_id,
}
try:
resp = self.session.post(api_url, data=data, headers=self._get_stripe_headers(), timeout=30)
if resp.status_code == 200:
result = resp.json()
payment_method_id = result.get("id", "")
if payment_method_id:
return payment_method_id
error_text = resp.text[:200] if resp.text else "无响应"
log_progress(f"[X] 创建支付方式失败: {resp.status_code} - {error_text}")
return ""
except Exception as e:
log_progress(f"[X] 创建支付方式异常: {e}")
return ""
def confirm_payment(self, payment_method_id: str, captcha_token: str = "",
rv_timestamp: str = "") -> tuple:
"""Step 2: 确认支付
Returns:
tuple: (success: bool, error_message: str)
"""
api_url = f"https://api.stripe.com/v1/payment_pages/{self.session_id}/confirm"
data = {
"eid": "NA",
"payment_method": payment_method_id,
"expected_amount": "0",
"consent[terms_of_service]": "accepted",
"tax_id_collection[purchasing_as_business]": "false",
"expected_payment_method_type": "sepa_debit",
"_stripe_version": STRIPE_VERSION,
"guid": self.guid,
"muid": self.muid,
"sid": self.sid,
"key": self.stripe_public_key,
"version": STRIPE_JS_VERSION,
"referrer": "https://chatgpt.com",
"client_attribution_metadata[client_session_id]": self.client_session_id,
"client_attribution_metadata[checkout_session_id]": self.session_id,
"client_attribution_metadata[merchant_integration_source]": "checkout",
"client_attribution_metadata[merchant_integration_version]": "hosted_checkout",
"client_attribution_metadata[payment_method_selection_flow]": "automatic",
"client_attribution_metadata[checkout_config_id]": self.checkout_config_id,
}
if self.init_checksum:
data["init_checksum"] = self.init_checksum
if self.js_checksum:
data["js_checksum"] = self.js_checksum
if captcha_token:
data["passive_captcha_token"] = captcha_token
data["passive_captcha_ekey"] = ""
if rv_timestamp:
data["rv_timestamp"] = rv_timestamp
try:
resp = self.session.post(api_url, data=data, headers=self._get_stripe_headers(), timeout=30)
if resp.status_code == 200:
result = resp.json()
state = result.get("state", "")
if state in ["succeeded", "processing", "processing_subscription"]:
return True, ""
elif state == "failed":
error = result.get("error", {})
error_msg = error.get("message", str(error)) if isinstance(error, dict) else str(error)
return False, f"支付失败: {error_msg}"
else:
# 其他状态继续轮询
return True, ""
error_text = resp.text[:200] if resp.text else "无响应"
return False, f"确认失败: {resp.status_code} - {error_text}"
except Exception as e:
return False, f"确认异常: {e}"
def poll_payment_status(self, max_attempts: int = 20) -> tuple:
"""Step 3: 轮询支付状态
Returns:
tuple: (state: str, error_message: str)
"""
api_url = f"https://api.stripe.com/v1/payment_pages/{self.session_id}/poll?key={self.stripe_public_key}"
for attempt in range(max_attempts):
try:
resp = self.session.get(api_url, headers=self._get_stripe_headers(), timeout=30)
if resp.status_code == 200:
result = resp.json()
state = result.get("state", "")
if state == "succeeded":
return "succeeded", ""
elif state in ["failed", "canceled"]:
return state, f"支付 {state}"
time.sleep(2)
except Exception as e:
log_progress(f"[!] 轮询异常: {e}")
time.sleep(2)
return "timeout", "轮询超时"
def complete_payment(self, iban: str, name: str, email: str,
address: str, city: str, postal_code: str,
country: str = "DE") -> tuple:
"""执行完整支付流程
Returns:
tuple: (success: bool, error_message: str)
"""
# 获取页面参数
self.fetch_checkout_page()
# Step 1: 创建支付方式
payment_method_id = self.create_payment_method(
iban, name, email, address, city, postal_code, country
)
if not payment_method_id:
return False, "创建支付方式失败"
# Step 2: 确认支付
success, error = self.confirm_payment(payment_method_id)
if not success:
return False, error
# Step 3: 轮询状态
state, error = self.poll_payment_status(15)
if state == "succeeded":
return True, ""
return False, error or f"支付失败: {state}"
def api_payment_with_retry(checkout_url: str, email: str, session=None, proxy: str = None,
max_retries: int = 3, get_iban_func=None,
get_address_func=None, get_name_func=None,
progress_callback=None) -> tuple:
"""使用 API 完成支付流程(带 IBAN 重试)
Args:
checkout_url: Stripe checkout URL
email: 邮箱地址
session: 可选的 curl_cffi session
proxy: 代理地址
max_retries: 最大重试次数
get_iban_func: 获取 IBAN 的函数,签名: func() -> str
get_address_func: 获取地址的函数,签名: func() -> tuple(street, postal_code, city)
get_name_func: 获取姓名的函数,签名: func() -> str
progress_callback: 进度回调函数
Returns:
tuple: (success: bool, error_message: str)
"""
def log_cb(msg):
if progress_callback:
progress_callback(msg)
else:
log_progress(msg)
# 默认的 IBAN/地址/姓名生成函数
if get_iban_func is None:
def get_iban_func():
# 从 auto_gpt_team 导入
try:
from auto_gpt_team import get_sepa_ibans
ibans = get_sepa_ibans()
return random.choice(ibans) if ibans else ""
except:
return ""
if get_address_func is None:
def get_address_func():
try:
from auto_gpt_team import SEPA_ADDRESSES
return random.choice(SEPA_ADDRESSES)
except:
return ("Alexanderplatz 1", "10178", "Berlin")
if get_name_func is None:
def get_name_func():
try:
from auto_gpt_team import FIRST_NAMES, LAST_NAMES
return f"{random.choice(FIRST_NAMES)} {random.choice(LAST_NAMES)}"
except:
return "Max Mustermann"
# 初始化 Stripe 支付处理器
stripe_handler = StripePaymentAPI(checkout_url, session=session, proxy=proxy)
last_error = ""
for retry in range(max_retries):
# 生成支付信息
iban = get_iban_func()
if not iban:
return False, "没有可用的 IBAN"
street, postal_code, city = get_address_func()
account_name = get_name_func()
if retry == 0:
log_status("API支付", "SEPA 支付处理中...")
log_cb(f"IBAN: {iban[:8]}...")
log_cb(f"地址: {street}, {postal_code} {city}")
log_cb(f"姓名: {account_name}")
else:
log_cb(f"[!] 重试 {retry}/{max_retries-1},更换 IBAN...")
log_cb(f"新 IBAN: {iban[:8]}...")
success, error = stripe_handler.complete_payment(
iban, account_name, email, street, city, postal_code, "DE"
)
if success:
log_status("API支付", "[OK] 支付成功")
return True, ""
last_error = error
# 分析错误类型,决定是否重试
error_lower = error.lower() if error else ""
# IBAN/BIC 相关错误 - 换 IBAN 重试
if "bank_account_unusable" in error_lower or "bic" in error_lower or "iban" in error_lower:
log_cb(f"[!] IBAN 无效: {error}")
if retry < max_retries - 1:
continue
# 可恢复错误 - 重试
retryable_errors = ["400", "500", "timeout", "eof", "connection", "确认失败"]
is_retryable = any(e in error_lower for e in retryable_errors)
if is_retryable and retry < max_retries - 1:
log_cb(f"[!] 支付错误: {error},重试中...")
time.sleep(1)
continue
# 不可恢复错误或重试用尽
break
log_status("API支付", f"[X] 支付失败: {last_error}")
return False, last_error
def is_stripe_api_available() -> bool:
"""检查 Stripe API 模式是否可用"""
return CURL_CFFI_AVAILABLE

View File

@@ -3861,6 +3861,13 @@ class ProvisionerBot:
api_supported = False
current_mode = "browser"
# 获取当前并发数
try:
from config import CONCURRENT_WORKERS, CONCURRENT_ENABLED
current_workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
except ImportError:
current_workers = 1
keyboard = [
[
InlineKeyboardButton("📋 查看配置", callback_data="autogptplus:config"),
@@ -3887,6 +3894,11 @@ class ProvisionerBot:
InlineKeyboardButton(f"⚙️ 注册模式: {mode_icon} {mode_text}", callback_data="autogptplus:select_mode"),
])
# 添加并发设置按钮
keyboard.append([
InlineKeyboardButton(f"⚡ 并发数: {current_workers}", callback_data="autogptplus:set_concurrent"),
])
keyboard.append([
InlineKeyboardButton("🚀 开始注册", callback_data="autogptplus:register"),
])
@@ -3932,6 +3944,10 @@ class ProvisionerBot:
await self._set_autogptplus_mode(query, sub_action)
elif action == "register":
await self._start_autogptplus_register(query, context)
elif action == "set_concurrent":
await self._show_autogptplus_concurrent_selection(query)
elif action == "concurrent":
await self._set_autogptplus_concurrent(query, sub_action)
elif action == "back":
# 返回主菜单
await query.edit_message_text(
@@ -4086,7 +4102,119 @@ class ProvisionerBot:
reply_markup=reply_markup
)
async def _test_autogptplus_email(self, query):
async def _show_autogptplus_concurrent_selection(self, query):
"""显示并发数选择界面"""
try:
from config import CONCURRENT_WORKERS, CONCURRENT_ENABLED
current_workers = CONCURRENT_WORKERS if CONCURRENT_ENABLED else 1
except ImportError:
current_workers = 1
keyboard = [
[
InlineKeyboardButton(
f"{'' if current_workers == 1 else ''}1 并发",
callback_data="autogptplus:concurrent:1"
),
InlineKeyboardButton(
f"{'' if current_workers == 3 else ''}3 并发",
callback_data="autogptplus:concurrent:3"
),
InlineKeyboardButton(
f"{'' if current_workers == 5 else ''}5 并发",
callback_data="autogptplus:concurrent:5"
),
],
[InlineKeyboardButton("◀️ 返回", callback_data="autogptplus:back")],
]
reply_markup = InlineKeyboardMarkup(keyboard)
await query.edit_message_text(
"<b>⚡ 设置并发数</b>\n\n"
f"当前并发数: <b>{current_workers}</b>\n\n"
"选择注册时的并发数量:\n"
"• 1 并发 - 稳定,适合测试\n"
"• 3 并发 - 平衡速度和稳定性\n"
"• 5 并发 - 最快,需要较好的网络",
parse_mode="HTML",
reply_markup=reply_markup
)
async def _set_autogptplus_concurrent(self, query, workers_str: str):
"""设置并发数"""
try:
workers = int(workers_str)
if workers not in [1, 3, 5]:
await query.answer("❌ 无效的并发数", show_alert=True)
return
except ValueError:
await query.answer("❌ 无效的并发数", show_alert=True)
return
# 更新 config.toml
try:
from config import CONFIG_FILE
# 读取当前配置文件
with open(CONFIG_FILE, "r", encoding="utf-8") as f:
content = f.read()
# 检查是否存在 [concurrent] section
if "[concurrent]" not in content:
# 添加 [concurrent] section
content += f"\n\n[concurrent]\nenabled = true\nworkers = {workers}\n"
else:
# 更新 workers 值
import re
# 更新 enabled
if re.search(r"^\s*enabled\s*=", content, re.MULTILINE):
content = re.sub(
r"^(\s*enabled\s*=\s*).*$",
f"\\g<1>true",
content,
flags=re.MULTILINE
)
else:
# 在 [concurrent] 后添加 enabled
content = content.replace("[concurrent]", "[concurrent]\nenabled = true")
# 更新 workers
if re.search(r"^\s*workers\s*=", content, re.MULTILINE):
content = re.sub(
r"^(\s*workers\s*=\s*).*$",
f"\\g<1>{workers}",
content,
flags=re.MULTILINE
)
else:
# 在 [concurrent] section 中添加 workers
content = re.sub(
r"(\[concurrent\][^\[]*)",
f"\\g<1>workers = {workers}\n",
content
)
# 写回配置文件
with open(CONFIG_FILE, "w", encoding="utf-8") as f:
f.write(content)
# 重载配置
from config import reload_config
reload_config()
await query.answer(f"✅ 并发数已设置为 {workers}", show_alert=True)
# 返回主菜单
await query.edit_message_text(
"<b>🤖 AutoGPTPlus 管理面板</b>\n\n"
"ChatGPT 订阅自动化配置管理\n\n"
"请选择功能:",
parse_mode="HTML",
reply_markup=self._get_autogptplus_main_keyboard()
)
except Exception as e:
await query.answer(f"❌ 设置失败: {e}", show_alert=True)
"""测试 AutoGPTPlus 邮件创建"""
await query.edit_message_text("⏳ 正在测试邮件创建...")