import os import time from typing import Any import requests from autoteam import create_eu_billing_with_browser_sim def _env_required(name: str) -> str: value = os.getenv(name, "").strip() if not value: raise SystemExit(f"Missing required env var: {name}") return value def _parse_allowed_ids(env_name: str) -> set[int] | None: raw = os.getenv(env_name, "").strip() if not raw: return None ids: set[int] = set() for part in raw.split(","): part = part.strip() if not part: continue ids.add(int(part)) return ids class TgBot: def __init__(self, token: str): self._token = token self._base = f"https://api.telegram.org/bot{token}" self._offset: int | None = None def _post(self, method: str, payload: dict[str, Any]) -> Any: resp = requests.post(f"{self._base}/{method}", json=payload, timeout=60) resp.raise_for_status() data = resp.json() if not data.get("ok"): raise RuntimeError(str(data)) return data["result"] def _get(self, method: str, params: dict[str, Any]) -> Any: resp = requests.get(f"{self._base}/{method}", params=params, timeout=60) resp.raise_for_status() data = resp.json() if not data.get("ok"): raise RuntimeError(str(data)) return data["result"] def send_message( self, chat_id: int, text: str, *, reply_to_message_id: int | None = None, disable_web_page_preview: bool = True, ) -> None: payload: dict[str, Any] = { "chat_id": chat_id, "text": text, "disable_web_page_preview": disable_web_page_preview, } if reply_to_message_id is not None: payload["reply_to_message_id"] = reply_to_message_id self._post("sendMessage", payload) def get_updates(self, *, timeout: int = 30) -> list[dict[str, Any]]: params: dict[str, Any] = { "timeout": timeout, "allowed_updates": ["message"], } if self._offset is not None: params["offset"] = self._offset updates = self._get("getUpdates", params) if updates: self._offset = int(updates[-1]["update_id"]) + 1 return updates def _help_text() -> str: return ( "用法:\n" " /gen 生成 checkout URL\n" " /id 显示你的 user_id\n" " /help 帮助\n\n" "建议仅在私聊中使用,并开启白名单(TG_ALLOWED_USER_IDS)。" ) def _extract_access_token(text: str) -> str | None: text = (text or "").strip() if not text: return None if text.startswith("/gen"): parts = text.split(maxsplit=1) if len(parts) == 2: return parts[1].strip() return None if text.startswith("/start") or text.startswith("/help") or text.startswith("/id"): return None return None def main() -> None: token = _env_required("TG_BOT_TOKEN") allowed_user_ids = _parse_allowed_ids("TG_ALLOWED_USER_IDS") allow_groups = os.getenv("TG_ALLOW_GROUPS", "").strip().lower() in {"1", "true", "yes"} run(token=token, allowed_user_ids=allowed_user_ids, allow_groups=allow_groups) def run(*, token: str, allowed_user_ids: set[int] | None, allow_groups: bool) -> None: bot = TgBot(token) print("Telegram bot started (polling)...") while True: try: updates = bot.get_updates(timeout=30) for upd in updates: msg = upd.get("message") or {} text = (msg.get("text") or "").strip() chat = msg.get("chat") or {} chat_id = chat.get("id") chat_type = chat.get("type") from_user = msg.get("from") or {} user_id = from_user.get("id") message_id = msg.get("message_id") if chat_id is None or user_id is None: continue if allowed_user_ids is not None and int(user_id) not in allowed_user_ids: continue if not allow_groups and chat_type != "private": bot.send_message( int(chat_id), "请在私聊中使用该 bot(或设置 TG_ALLOW_GROUPS=1)。", reply_to_message_id=int(message_id) if message_id is not None else None, ) continue if text.startswith("/start") or text.startswith("/help"): bot.send_message( int(chat_id), _help_text(), reply_to_message_id=int(message_id) if message_id is not None else None, ) continue if text.startswith("/id"): bot.send_message( int(chat_id), f"user_id: {int(user_id)}", reply_to_message_id=int(message_id) if message_id is not None else None, ) continue access_token = _extract_access_token(text) if text.startswith("/gen") and not access_token: bot.send_message( int(chat_id), "请提供 access_token:/gen ", reply_to_message_id=int(message_id) if message_id is not None else None, ) continue if access_token: bot.send_message( int(chat_id), "正在生成,请稍等…", reply_to_message_id=int(message_id) if message_id is not None else None, ) result = create_eu_billing_with_browser_sim(access_token) if result.ok and result.checkout_url: bot.send_message( int(chat_id), f"Checkout URL:\n{result.checkout_url}", reply_to_message_id=int(message_id) if message_id is not None else None, ) else: bot.send_message( int(chat_id), f"失败:{result.error}", reply_to_message_id=int(message_id) if message_id is not None else None, ) except KeyboardInterrupt: print("Stopped.") return except Exception as e: print(f"[warn] polling error: {e}") time.sleep(2) if __name__ == "__main__": main()