This commit is contained in:
dela
2026-01-08 23:07:12 +08:00
commit 7eaae93c4f
8 changed files with 537 additions and 0 deletions

198
tg_bot.py Normal file
View File

@@ -0,0 +1,198 @@
import os
import time
from typing import Any
import requests
from autoteam import create_eu_billing_with_browser_sim
def _env_required(name: str) -> str:
value = os.getenv(name, "").strip()
if not value:
raise SystemExit(f"Missing required env var: {name}")
return value
def _parse_allowed_ids(env_name: str) -> set[int] | None:
raw = os.getenv(env_name, "").strip()
if not raw:
return None
ids: set[int] = set()
for part in raw.split(","):
part = part.strip()
if not part:
continue
ids.add(int(part))
return ids
class TgBot:
def __init__(self, token: str):
self._token = token
self._base = f"https://api.telegram.org/bot{token}"
self._offset: int | None = None
def _post(self, method: str, payload: dict[str, Any]) -> Any:
resp = requests.post(f"{self._base}/{method}", json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
if not data.get("ok"):
raise RuntimeError(str(data))
return data["result"]
def _get(self, method: str, params: dict[str, Any]) -> Any:
resp = requests.get(f"{self._base}/{method}", params=params, timeout=60)
resp.raise_for_status()
data = resp.json()
if not data.get("ok"):
raise RuntimeError(str(data))
return data["result"]
def send_message(
self,
chat_id: int,
text: str,
*,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool = True,
) -> None:
payload: dict[str, Any] = {
"chat_id": chat_id,
"text": text,
"disable_web_page_preview": disable_web_page_preview,
}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
self._post("sendMessage", payload)
def get_updates(self, *, timeout: int = 30) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"timeout": timeout,
"allowed_updates": ["message"],
}
if self._offset is not None:
params["offset"] = self._offset
updates = self._get("getUpdates", params)
if updates:
self._offset = int(updates[-1]["update_id"]) + 1
return updates
def _help_text() -> str:
return (
"用法:\n"
" /gen <access_token> 生成 checkout URL\n"
" /id 显示你的 user_id\n"
" /help 帮助\n\n"
"建议仅在私聊中使用并开启白名单TG_ALLOWED_USER_IDS"
)
def _extract_access_token(text: str) -> str | None:
text = (text or "").strip()
if not text:
return None
if text.startswith("/gen"):
parts = text.split(maxsplit=1)
if len(parts) == 2:
return parts[1].strip()
return None
if text.startswith("/start") or text.startswith("/help") or text.startswith("/id"):
return None
return None
def main() -> None:
token = _env_required("TG_BOT_TOKEN")
allowed_user_ids = _parse_allowed_ids("TG_ALLOWED_USER_IDS")
allow_groups = os.getenv("TG_ALLOW_GROUPS", "").strip().lower() in {"1", "true", "yes"}
run(token=token, allowed_user_ids=allowed_user_ids, allow_groups=allow_groups)
def run(*, token: str, allowed_user_ids: set[int] | None, allow_groups: bool) -> None:
bot = TgBot(token)
print("Telegram bot started (polling)...")
while True:
try:
updates = bot.get_updates(timeout=30)
for upd in updates:
msg = upd.get("message") or {}
text = (msg.get("text") or "").strip()
chat = msg.get("chat") or {}
chat_id = chat.get("id")
chat_type = chat.get("type")
from_user = msg.get("from") or {}
user_id = from_user.get("id")
message_id = msg.get("message_id")
if chat_id is None or user_id is None:
continue
if allowed_user_ids is not None and int(user_id) not in allowed_user_ids:
continue
if not allow_groups and chat_type != "private":
bot.send_message(
int(chat_id),
"请在私聊中使用该 bot或设置 TG_ALLOW_GROUPS=1",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if text.startswith("/start") or text.startswith("/help"):
bot.send_message(
int(chat_id),
_help_text(),
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if text.startswith("/id"):
bot.send_message(
int(chat_id),
f"user_id: {int(user_id)}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
access_token = _extract_access_token(text)
if text.startswith("/gen") and not access_token:
bot.send_message(
int(chat_id),
"请提供 access_token/gen <access_token>",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if access_token:
bot.send_message(
int(chat_id),
"正在生成,请稍等…",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
result = create_eu_billing_with_browser_sim(access_token)
if result.ok and result.checkout_url:
bot.send_message(
int(chat_id),
f"Checkout URL:\n{result.checkout_url}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
else:
bot.send_message(
int(chat_id),
f"失败:{result.error}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
except KeyboardInterrupt:
print("Stopped.")
return
except Exception as e:
print(f"[warn] polling error: {e}")
time.sleep(2)
if __name__ == "__main__":
main()