dxy_ya_bot/bot.py

88 lines
3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import requests
import json
import os
from telegram import Update, ReplyKeyboardMarkup
from telegram.ext import ApplicationBuilder, ContextTypes, CommandHandler, MessageHandler, ConversationHandler, filters
from dotenv import load_dotenv
logging.basicConfig(
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
level=logging.INFO
)
logger = logging.getLogger(__name__)
load_dotenv()
bot_token = os.getenv('BOT_TOKEN')
ya_token = os.getenv('YA_TOKEN')
whitelisted = os.getenv('WHITELISTED')
ya_url = 'https://partner2.yandex.ru/api/statistics2/get.json'
msg_to_period = {
"Сегодня": "today",
"Вчера": "yesterday",
"Месяц": "30days"
}
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
if update.effective_user.username not in whitelisted:
await update.message.reply_text("Запрет")
return ConversationHandler.END
reply_keyboard = [["Сегодня", "Вчера", "Месяц"]]
await update.message.reply_text("Какую стату вывести", reply_markup=ReplyKeyboardMarkup(reply_keyboard))
return 0
async def ya_api(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
reply = await fetch_data(update.message.text)
await update.message.reply_text(reply)
return 0
async def fetch_data(msg_period: str) -> str:
response = requests.get(ya_url, headers={'Authorization': ya_token}, params={
'lang': 'ru',
'period': msg_to_period[msg_period],
'field': ['shows', 'hits_render', 'partner_wo_nds', 'cpmv_partner_wo_nds']
})
json_data = json.loads(response.text)
data = json_data['data']['totals']['2'][0]
return f"Данные за {msg_period}\n\n\
Подборы рекламы: {data['hits_render']}\n\
Видимые показы: {data['shows']}\n\
Вознаграждение партнеру за рекламу: {data['partner_wo_nds']} руб.\n\
Стоимость 1к видимых показов: {data['cpmv_partner_wo_nds']} руб."
async def cancel(update: Update, context: ContextTypes.DEFAULT_TYPE) -> int:
await update.message.reply_text("домой")
return ConversationHandler.END
if __name__ == '__main__':
if ya_token is None:
logger.fatal("YA_TOKEN env variable is required")
exit(1)
if bot_token is None:
logger.fatal("BOT_TOKEN env variable is required")
exit(1)
if whitelisted is None:
logger.fatal("WHITELISTED env varialbe is required")
exit(1)
whitelisted = whitelisted.split(',')
application = ApplicationBuilder().token(bot_token).build()
conv_handler = ConversationHandler(
entry_points=[CommandHandler('start', start)],
states={
0: [MessageHandler(filters.Regex("^(Сегодня|Вчера|Месяц)$"), ya_api)]
},
fallbacks=[CommandHandler('cancel', cancel)]
)
application.add_handler(conv_handler)
application.run_polling()