Files
autoTeam/tg_bot.py
2026-01-08 23:07:12 +08:00

199 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import time
from typing import Any
import requests
from autoteam import create_eu_billing_with_browser_sim
def _env_required(name: str) -> str:
value = os.getenv(name, "").strip()
if not value:
raise SystemExit(f"Missing required env var: {name}")
return value
def _parse_allowed_ids(env_name: str) -> set[int] | None:
raw = os.getenv(env_name, "").strip()
if not raw:
return None
ids: set[int] = set()
for part in raw.split(","):
part = part.strip()
if not part:
continue
ids.add(int(part))
return ids
class TgBot:
def __init__(self, token: str):
self._token = token
self._base = f"https://api.telegram.org/bot{token}"
self._offset: int | None = None
def _post(self, method: str, payload: dict[str, Any]) -> Any:
resp = requests.post(f"{self._base}/{method}", json=payload, timeout=60)
resp.raise_for_status()
data = resp.json()
if not data.get("ok"):
raise RuntimeError(str(data))
return data["result"]
def _get(self, method: str, params: dict[str, Any]) -> Any:
resp = requests.get(f"{self._base}/{method}", params=params, timeout=60)
resp.raise_for_status()
data = resp.json()
if not data.get("ok"):
raise RuntimeError(str(data))
return data["result"]
def send_message(
self,
chat_id: int,
text: str,
*,
reply_to_message_id: int | None = None,
disable_web_page_preview: bool = True,
) -> None:
payload: dict[str, Any] = {
"chat_id": chat_id,
"text": text,
"disable_web_page_preview": disable_web_page_preview,
}
if reply_to_message_id is not None:
payload["reply_to_message_id"] = reply_to_message_id
self._post("sendMessage", payload)
def get_updates(self, *, timeout: int = 30) -> list[dict[str, Any]]:
params: dict[str, Any] = {
"timeout": timeout,
"allowed_updates": ["message"],
}
if self._offset is not None:
params["offset"] = self._offset
updates = self._get("getUpdates", params)
if updates:
self._offset = int(updates[-1]["update_id"]) + 1
return updates
def _help_text() -> str:
return (
"用法:\n"
" /gen <access_token> 生成 checkout URL\n"
" /id 显示你的 user_id\n"
" /help 帮助\n\n"
"建议仅在私聊中使用并开启白名单TG_ALLOWED_USER_IDS"
)
def _extract_access_token(text: str) -> str | None:
text = (text or "").strip()
if not text:
return None
if text.startswith("/gen"):
parts = text.split(maxsplit=1)
if len(parts) == 2:
return parts[1].strip()
return None
if text.startswith("/start") or text.startswith("/help") or text.startswith("/id"):
return None
return None
def main() -> None:
token = _env_required("TG_BOT_TOKEN")
allowed_user_ids = _parse_allowed_ids("TG_ALLOWED_USER_IDS")
allow_groups = os.getenv("TG_ALLOW_GROUPS", "").strip().lower() in {"1", "true", "yes"}
run(token=token, allowed_user_ids=allowed_user_ids, allow_groups=allow_groups)
def run(*, token: str, allowed_user_ids: set[int] | None, allow_groups: bool) -> None:
bot = TgBot(token)
print("Telegram bot started (polling)...")
while True:
try:
updates = bot.get_updates(timeout=30)
for upd in updates:
msg = upd.get("message") or {}
text = (msg.get("text") or "").strip()
chat = msg.get("chat") or {}
chat_id = chat.get("id")
chat_type = chat.get("type")
from_user = msg.get("from") or {}
user_id = from_user.get("id")
message_id = msg.get("message_id")
if chat_id is None or user_id is None:
continue
if allowed_user_ids is not None and int(user_id) not in allowed_user_ids:
continue
if not allow_groups and chat_type != "private":
bot.send_message(
int(chat_id),
"请在私聊中使用该 bot或设置 TG_ALLOW_GROUPS=1",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if text.startswith("/start") or text.startswith("/help"):
bot.send_message(
int(chat_id),
_help_text(),
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if text.startswith("/id"):
bot.send_message(
int(chat_id),
f"user_id: {int(user_id)}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
access_token = _extract_access_token(text)
if text.startswith("/gen") and not access_token:
bot.send_message(
int(chat_id),
"请提供 access_token/gen <access_token>",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
continue
if access_token:
bot.send_message(
int(chat_id),
"正在生成,请稍等…",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
result = create_eu_billing_with_browser_sim(access_token)
if result.ok and result.checkout_url:
bot.send_message(
int(chat_id),
f"Checkout URL:\n{result.checkout_url}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
else:
bot.send_message(
int(chat_id),
f"失败:{result.error}",
reply_to_message_id=int(message_id) if message_id is not None else None,
)
except KeyboardInterrupt:
print("Stopped.")
return
except Exception as e:
print(f"[warn] polling error: {e}")
time.sleep(2)
if __name__ == "__main__":
main()