199 lines
6.7 KiB
Python
199 lines
6.7 KiB
Python
import os
|
||
import time
|
||
from typing import Any
|
||
|
||
import requests
|
||
|
||
from autoteam import create_eu_billing_with_browser_sim
|
||
|
||
|
||
def _env_required(name: str) -> str:
|
||
value = os.getenv(name, "").strip()
|
||
if not value:
|
||
raise SystemExit(f"Missing required env var: {name}")
|
||
return value
|
||
|
||
|
||
def _parse_allowed_ids(env_name: str) -> set[int] | None:
|
||
raw = os.getenv(env_name, "").strip()
|
||
if not raw:
|
||
return None
|
||
ids: set[int] = set()
|
||
for part in raw.split(","):
|
||
part = part.strip()
|
||
if not part:
|
||
continue
|
||
ids.add(int(part))
|
||
return ids
|
||
|
||
|
||
class TgBot:
|
||
def __init__(self, token: str):
|
||
self._token = token
|
||
self._base = f"https://api.telegram.org/bot{token}"
|
||
self._offset: int | None = None
|
||
|
||
def _post(self, method: str, payload: dict[str, Any]) -> Any:
|
||
resp = requests.post(f"{self._base}/{method}", json=payload, timeout=60)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if not data.get("ok"):
|
||
raise RuntimeError(str(data))
|
||
return data["result"]
|
||
|
||
def _get(self, method: str, params: dict[str, Any]) -> Any:
|
||
resp = requests.get(f"{self._base}/{method}", params=params, timeout=60)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
if not data.get("ok"):
|
||
raise RuntimeError(str(data))
|
||
return data["result"]
|
||
|
||
def send_message(
|
||
self,
|
||
chat_id: int,
|
||
text: str,
|
||
*,
|
||
reply_to_message_id: int | None = None,
|
||
disable_web_page_preview: bool = True,
|
||
) -> None:
|
||
payload: dict[str, Any] = {
|
||
"chat_id": chat_id,
|
||
"text": text,
|
||
"disable_web_page_preview": disable_web_page_preview,
|
||
}
|
||
if reply_to_message_id is not None:
|
||
payload["reply_to_message_id"] = reply_to_message_id
|
||
self._post("sendMessage", payload)
|
||
|
||
def get_updates(self, *, timeout: int = 30) -> list[dict[str, Any]]:
|
||
params: dict[str, Any] = {
|
||
"timeout": timeout,
|
||
"allowed_updates": ["message"],
|
||
}
|
||
if self._offset is not None:
|
||
params["offset"] = self._offset
|
||
updates = self._get("getUpdates", params)
|
||
if updates:
|
||
self._offset = int(updates[-1]["update_id"]) + 1
|
||
return updates
|
||
|
||
|
||
def _help_text() -> str:
|
||
return (
|
||
"用法:\n"
|
||
" /gen <access_token> 生成 checkout URL\n"
|
||
" /id 显示你的 user_id\n"
|
||
" /help 帮助\n\n"
|
||
"建议仅在私聊中使用,并开启白名单(TG_ALLOWED_USER_IDS)。"
|
||
)
|
||
|
||
|
||
def _extract_access_token(text: str) -> str | None:
|
||
text = (text or "").strip()
|
||
if not text:
|
||
return None
|
||
if text.startswith("/gen"):
|
||
parts = text.split(maxsplit=1)
|
||
if len(parts) == 2:
|
||
return parts[1].strip()
|
||
return None
|
||
if text.startswith("/start") or text.startswith("/help") or text.startswith("/id"):
|
||
return None
|
||
return None
|
||
|
||
|
||
def main() -> None:
|
||
token = _env_required("TG_BOT_TOKEN")
|
||
allowed_user_ids = _parse_allowed_ids("TG_ALLOWED_USER_IDS")
|
||
allow_groups = os.getenv("TG_ALLOW_GROUPS", "").strip().lower() in {"1", "true", "yes"}
|
||
|
||
run(token=token, allowed_user_ids=allowed_user_ids, allow_groups=allow_groups)
|
||
|
||
|
||
def run(*, token: str, allowed_user_ids: set[int] | None, allow_groups: bool) -> None:
|
||
bot = TgBot(token)
|
||
print("Telegram bot started (polling)...")
|
||
|
||
while True:
|
||
try:
|
||
updates = bot.get_updates(timeout=30)
|
||
for upd in updates:
|
||
msg = upd.get("message") or {}
|
||
text = (msg.get("text") or "").strip()
|
||
chat = msg.get("chat") or {}
|
||
chat_id = chat.get("id")
|
||
chat_type = chat.get("type")
|
||
from_user = msg.get("from") or {}
|
||
user_id = from_user.get("id")
|
||
message_id = msg.get("message_id")
|
||
|
||
if chat_id is None or user_id is None:
|
||
continue
|
||
|
||
if allowed_user_ids is not None and int(user_id) not in allowed_user_ids:
|
||
continue
|
||
|
||
if not allow_groups and chat_type != "private":
|
||
bot.send_message(
|
||
int(chat_id),
|
||
"请在私聊中使用该 bot(或设置 TG_ALLOW_GROUPS=1)。",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
continue
|
||
|
||
if text.startswith("/start") or text.startswith("/help"):
|
||
bot.send_message(
|
||
int(chat_id),
|
||
_help_text(),
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
continue
|
||
|
||
if text.startswith("/id"):
|
||
bot.send_message(
|
||
int(chat_id),
|
||
f"user_id: {int(user_id)}",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
continue
|
||
|
||
access_token = _extract_access_token(text)
|
||
if text.startswith("/gen") and not access_token:
|
||
bot.send_message(
|
||
int(chat_id),
|
||
"请提供 access_token:/gen <access_token>",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
continue
|
||
|
||
if access_token:
|
||
bot.send_message(
|
||
int(chat_id),
|
||
"正在生成,请稍等…",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
result = create_eu_billing_with_browser_sim(access_token)
|
||
if result.ok and result.checkout_url:
|
||
bot.send_message(
|
||
int(chat_id),
|
||
f"Checkout URL:\n{result.checkout_url}",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
else:
|
||
bot.send_message(
|
||
int(chat_id),
|
||
f"失败:{result.error}",
|
||
reply_to_message_id=int(message_id) if message_id is not None else None,
|
||
)
|
||
except KeyboardInterrupt:
|
||
print("Stopped.")
|
||
return
|
||
except Exception as e:
|
||
print(f"[warn] polling error: {e}")
|
||
time.sleep(2)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|