From 3e5bff2cb573eceac86ad86904329712a526f5db Mon Sep 17 00:00:00 2001 From: Masahiko AMANO Date: Sun, 5 Jan 2025 23:51:45 +0300 Subject: [PATCH] init(bot): add Telegram bot --- bot/requirements.txt | 7 ++ bot/src/db.py | 63 +++++++++++++++++ bot/src/snb.py | 165 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 235 insertions(+) create mode 100644 bot/requirements.txt create mode 100644 bot/src/db.py create mode 100755 bot/src/snb.py diff --git a/bot/requirements.txt b/bot/requirements.txt new file mode 100644 index 0000000..65a2bde --- /dev/null +++ b/bot/requirements.txt @@ -0,0 +1,7 @@ +certifi==2024.12.14 +charset-normalizer==3.4.1 +idna==3.10 +psycopg2-binary==2.9.10 +pyTelegramBotAPI==4.26.0 +requests==2.32.3 +urllib3==2.3.0 diff --git a/bot/src/db.py b/bot/src/db.py new file mode 100644 index 0000000..54b3284 --- /dev/null +++ b/bot/src/db.py @@ -0,0 +1,63 @@ +from psycopg2 import connect as db_connect +from psycopg2.extras import DictCursor + + +class Database: + def __init__(self, **kwargs): + self.conn = db_connect(**kwargs) + self.conn.autocommit = True + + def close(self): + self.conn.close() + + # get all users + def get_users(self): + with self.conn.cursor(cursor_factory=DictCursor) as cursor: + cursor.execute("SELECT * FROM users") + return cursor.fetchall() + + # check if message sender is authorized + def check_user(self, uid): + with self.conn.cursor() as cursor: + cursor.execute("SELECT 1 FROM users WHERE telegram_id=%s", (uid,)) + return cursor.fetchone() is not None + + # check if user is an editor + def check_editor(self, msg): + with self.conn.cursor() as cursor: + cursor.execute("SELECT is_editor FROM users WHERE telegram_id=%s", (msg.from_user.id,)) + return cursor.fetchone()[0] + + # get quotes range + def get_quotes(self, start=0, count="ALL"): + with self.conn.cursor(cursor_factory=DictCursor) as cursor: + cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id WHERE NOT q.is_removed ORDER BY q.datetime DESC OFFSET %s LIMIT %s", (start, count)) + return cursor.fetchall() + + # get single quote + def get_quote(self, id): + with self.conn.cursor(cursor_factory=DictCursor) as cursor: + cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id WHERE NOT q.is_removed AND q.id=%s", (id,)) + return cursor.fetchone() + + # get random quote + def get_random_quote(self): + with self.conn.cursor(cursor_factory=DictCursor) as cursor: + cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id ORDER BY random() LIMIT 1") + return cursor.fetchone() + + # add quote + def add_quote(self, msg): + if not self.check_editor(msg): + raise RuntimeError + quote = msg.text.strip().split("\n\n") + text = quote[0].strip() + author = "" if len(quote) == 1 else quote[1].strip() + with self.conn.cursor() as cursor: + cursor.execute("INSERT INTO quotes(text, author, creator_id) VALUES(%s, NULLIF(%s, ''), (SELECT id FROM users WHERE telegram_id=%s)) RETURNING id", (quote[0].strip(), author, msg.from_user.id)) + return cursor.fetchone()[0] + + def quotes_count(self): + with self.conn.cursor() as cursor: + cursor.execute("SELECT count(*) FROM quotes") + return cursor.fetchone()[0] diff --git a/bot/src/snb.py b/bot/src/snb.py new file mode 100755 index 0000000..002c8d0 --- /dev/null +++ b/bot/src/snb.py @@ -0,0 +1,165 @@ +from telebot import TeleBot +from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton +from configparser import ConfigParser +from db import Database +import os +import atexit +import signal +import logging as log +from time import sleep + +# set logger +log.basicConfig( + level=log.INFO, + filename="/var/log/skazanull/bot.log", + filemode="a", + format="%(asctime)s | %(levelname)s | %(message)s" +) + +# actions to do on exit +exit_actions = [] +defer = exit_actions.append +def finalize(*args): + try: + exec('\n'.join(exit_actions)) + finally: + os._exit(0) +atexit.register(finalize) +signal.signal(signal.SIGTERM, finalize) +signal.signal(signal.SIGINT, finalize) + +# load config +try: + conf = ConfigParser() + conf.read("/etc/skazanull/bot.conf") +except Exception as e: + log.critical(f"failed to load config: {str(e)}") + exit(1) + +# connect to db +try: + db = Database( + host=conf["DB"]["Host"], + port=conf["DB"]["Port"], + dbname=conf["DB"]["Name"], + user=conf["DB"]["User"], + password=conf["DB"]["Password"], + application_name="SkazaNull Telegram Bot", + ) + defer("db.close()") +except Exception as e: + log.critical(f"failed to connect to db: {str(e)}") + exit(1) + +# initialize bot +snb = TeleBot(conf["General"]["Token"]) +memo = {} + +# print help +@snb.message_handler(commands=["start", "help"]) +def helper(msg): + if not db.check_user(msg.from_user.id): + log.info(f"unauthorized access from user {msg.from_user.id}") + snb.send_message(msg.chat.id, "Я не понял, ты вообще кто такой?") + return + snb.send_message(msg.chat.id, """*SkazaNull - пацанский ботяра для пацанских цитат* + +/help - Помощь нужна тому, кто ее просит, а не тому, кто ее не просит +/random - Показать рандомную цитату из базы +/quotes - Перейти в режим просмотра пацанских цитат + +Чтобы добавить новую цитату, просто пришли ее текстовым сообщением. Авторов цитаты обязательно укажи в конце, отделив двумя переносами строк. Такие дела, бро. + +Пример оформления цитаты: +``` +- А что это за жопа? +- Так это же Гурилий! + +Биба и Боба +```""", parse_mode="markdown") + +# quote formatter +def format_quote(quote): + return "Добавил %s %s:\n```\n%s\n%s```" % (quote["creator_name"], quote["datetime"], quote["text"], f"\n© {quote['author']}" if quote["author"] else "") + +# quotes view +@snb.message_handler(commands=["quotes"]) +def quotes_handler(msg, page=0, prev_msg=None, user=None): + if not user: + user = msg.from_user.id + if not db.check_user(user): + log.info(f"unauthorized access from user {user}") + snb.send_message(msg.chat.id, "Я не понял, ты вообще че за кернел??") + return + quotes_count = db.quotes_count() + if quotes_count == 0: + snb.send_message(msg.chat.id, "А где цитаты-то?") + return + page_size = int(conf["Output"]["QuotesPerPage"]) + pages_count = quotes_count // page_size + bool(quotes_count % page_size) + prev_page = (page-1) % pages_count + next_page = (page+1) % pages_count + buttons = InlineKeyboardMarkup() + prev_button = InlineKeyboardButton("←", callback_data=f"quotes:{prev_page}") + curr_button = InlineKeyboardButton(f"{page*page_size+1}-{min(quotes_count,(page+1)*page_size)}/{quotes_count}", callback_data=":") + next_button = InlineKeyboardButton("→", callback_data=f"quotes:{next_page}") + buttons.add(prev_button, curr_button, next_button) + snb.send_message(msg.chat.id, "\n\n".join(list(map(format_quote, db.get_quotes(page_size*page, page_size)))), parse_mode="markdown", reply_markup=buttons) + if prev_msg: + snb.delete_message(msg.chat.id, prev_msg.id) + +# search quotes by keywords +@snb.message_handler(commands=["search"]) +def search_quotes(msg): + pass + +# get random quote +@snb.message_handler(commands=["random"]) +def random_quote(msg): + if not db.check_user(msg.from_user.id): + log.info(f"unauthorized access from user {msg.from_user.id}") + snb.send_message(msg.chat.id, "Я не понял, ты вообще кто такой?") + return + quote = db.get_random_quote() + if quote: + snb.send_message(msg.chat.id, format_quote(quote), parse_mode="markdown") + return + snb.send_message(msg.chat.id, "А где цитаты-то?") + +# add new quote +@snb.message_handler(content_types=["text"]) +def text_handler(msg): + if not db.check_user(msg.from_user.id): + log.info(f"unauthorized access from user {msg.from_user.id}") + snb.send_message(msg.chat.id, "Я не понял, ты вообще че за кернел??") + return + try: + qid = db.add_quote(msg) + log.info(f"quote '{qid}' added") + snb.reply_to(msg, "Цитата добавлена!") + quote = db.get_quote(qid) + for user in db.get_users(): + if user["telegram_id"] != msg.from_user.id: + snb.send_message(user["telegram_id"], "Пользователь %s только что добавил новую цитату:\n```\n%s\n%s```" % (quote["creator_name"], quote["text"], f"\n© {quote['author']}" if quote['author'] else ""), parse_mode="markdown") + log.info(f"notified user '{user['telegram_id']}' about quote '{qid}'") + except RuntimeError: + log.info(f"attempt to add quote from non-editor {msg.from_user.id}") + snb.reply_to(msg, "Сорямбус, бро, но твои права не скачаны. Со всеми вопросами - к разрабам.") + +# callbacks +@snb.callback_query_handler(lambda c: True) +def callback_handler(c): + call = c.data.split(':') + if call[0] == "quotes": + quotes_handler(c.message, page=int(call[1]), prev_msg=c.message, user=c.from_user.id) + + +if __name__ == '__main__': + log.info("snb started") + defer("log.info(\"snb stopped\")") + while 1: + try: + snb.polling() + except Exception as e: + log.error("polling stopped: " + str(e)) + sleep(1)