-
Notifications
You must be signed in to change notification settings - Fork 0
/
04_webhook_handler_aiohttp.py
29 lines (22 loc) · 1.03 KB
/
04_webhook_handler_aiohttp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import os
from aiohttp import web
from telegram_wallet_pay.schemas import WebhookMessages
from telegram_wallet_pay.tools.aiohttp import check_signature
# store TELEGRAM_WALLET_PAY_TOKEN to your .env
# wallet token can be issued via https://pay.wallet.tg/
TELEGRAM_WALLET_PAY_TOKEN = os.getenv("TELEGRAM_WALLET_PAY_TOKEN")
# add `check_signature` decorator to your handler
@check_signature(TELEGRAM_WALLET_PAY_TOKEN)
async def webhook_handler(request: web.Request) -> web.StreamResponse:
"""Handle webhook from Telegram Wallet API."""
json_data = await request.text()
webhook_messages = WebhookMessages.model_validate_json(json_data)
for webhook_message in webhook_messages:
# process every webhook message as you wish
# e.g. store them to your database. We just print it here
print(f"Received webhook message: {webhook_message}")
return web.json_response({"success": True})
if __name__ == "__main__":
app = web.Application()
app.router.add_post("/wallet", webhook_handler)
web.run_app(app)