init(bot): add Telegram bot

This commit is contained in:
Masahiko AMANO 2025-01-05 23:51:45 +03:00
parent bd3dae6226
commit 3e5bff2cb5
3 changed files with 235 additions and 0 deletions

7
bot/requirements.txt Normal file
View File

@ -0,0 +1,7 @@
certifi==2024.12.14
charset-normalizer==3.4.1
idna==3.10
psycopg2-binary==2.9.10
pyTelegramBotAPI==4.26.0
requests==2.32.3
urllib3==2.3.0

63
bot/src/db.py Normal file
View File

@ -0,0 +1,63 @@
from psycopg2 import connect as db_connect
from psycopg2.extras import DictCursor
class Database:
def __init__(self, **kwargs):
self.conn = db_connect(**kwargs)
self.conn.autocommit = True
def close(self):
self.conn.close()
# get all users
def get_users(self):
with self.conn.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute("SELECT * FROM users")
return cursor.fetchall()
# check if message sender is authorized
def check_user(self, uid):
with self.conn.cursor() as cursor:
cursor.execute("SELECT 1 FROM users WHERE telegram_id=%s", (uid,))
return cursor.fetchone() is not None
# check if user is an editor
def check_editor(self, msg):
with self.conn.cursor() as cursor:
cursor.execute("SELECT is_editor FROM users WHERE telegram_id=%s", (msg.from_user.id,))
return cursor.fetchone()[0]
# get quotes range
def get_quotes(self, start=0, count="ALL"):
with self.conn.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id WHERE NOT q.is_removed ORDER BY q.datetime DESC OFFSET %s LIMIT %s", (start, count))
return cursor.fetchall()
# get single quote
def get_quote(self, id):
with self.conn.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id WHERE NOT q.is_removed AND q.id=%s", (id,))
return cursor.fetchone()
# get random quote
def get_random_quote(self):
with self.conn.cursor(cursor_factory=DictCursor) as cursor:
cursor.execute("SELECT q.id, q.text, to_char(q.datetime, 'DD.MM.YYYY HH24:MI:SS') AS datetime, q.author, q.creator_id, u.name AS creator_name FROM quotes q JOIN users u ON q.creator_id = u.id ORDER BY random() LIMIT 1")
return cursor.fetchone()
# add quote
def add_quote(self, msg):
if not self.check_editor(msg):
raise RuntimeError
quote = msg.text.strip().split("\n\n")
text = quote[0].strip()
author = "" if len(quote) == 1 else quote[1].strip()
with self.conn.cursor() as cursor:
cursor.execute("INSERT INTO quotes(text, author, creator_id) VALUES(%s, NULLIF(%s, ''), (SELECT id FROM users WHERE telegram_id=%s)) RETURNING id", (quote[0].strip(), author, msg.from_user.id))
return cursor.fetchone()[0]
def quotes_count(self):
with self.conn.cursor() as cursor:
cursor.execute("SELECT count(*) FROM quotes")
return cursor.fetchone()[0]

165
bot/src/snb.py Executable file
View File

@ -0,0 +1,165 @@
from telebot import TeleBot
from telebot.types import InlineKeyboardMarkup, InlineKeyboardButton
from configparser import ConfigParser
from db import Database
import os
import atexit
import signal
import logging as log
from time import sleep
# set logger
log.basicConfig(
level=log.INFO,
filename="/var/log/skazanull/bot.log",
filemode="a",
format="%(asctime)s | %(levelname)s | %(message)s"
)
# actions to do on exit
exit_actions = []
defer = exit_actions.append
def finalize(*args):
try:
exec('\n'.join(exit_actions))
finally:
os._exit(0)
atexit.register(finalize)
signal.signal(signal.SIGTERM, finalize)
signal.signal(signal.SIGINT, finalize)
# load config
try:
conf = ConfigParser()
conf.read("/etc/skazanull/bot.conf")
except Exception as e:
log.critical(f"failed to load config: {str(e)}")
exit(1)
# connect to db
try:
db = Database(
host=conf["DB"]["Host"],
port=conf["DB"]["Port"],
dbname=conf["DB"]["Name"],
user=conf["DB"]["User"],
password=conf["DB"]["Password"],
application_name="SkazaNull Telegram Bot",
)
defer("db.close()")
except Exception as e:
log.critical(f"failed to connect to db: {str(e)}")
exit(1)
# initialize bot
snb = TeleBot(conf["General"]["Token"])
memo = {}
# print help
@snb.message_handler(commands=["start", "help"])
def helper(msg):
if not db.check_user(msg.from_user.id):
log.info(f"unauthorized access from user {msg.from_user.id}")
snb.send_message(msg.chat.id, "Я не понял, ты вообще кто такой?")
return
snb.send_message(msg.chat.id, """*SkazaNull - пацанский ботяра для пацанских цитат*
/help - Помощь нужна тому, кто ее просит, а не тому, кто ее не просит
/random - Показать рандомную цитату из базы
/quotes - Перейти в режим просмотра пацанских цитат
Чтобы добавить новую цитату, просто пришли ее текстовым сообщением. Авторов цитаты обязательно укажи в конце, отделив двумя переносами строк. Такие дела, бро.
Пример оформления цитаты:
```
- А что это за жопа?
- Так это же Гурилий!
Биба и Боба
```""", parse_mode="markdown")
# quote formatter
def format_quote(quote):
return "Добавил %s %s:\n```\n%s\n%s```" % (quote["creator_name"], quote["datetime"], quote["text"], f"\n© {quote['author']}" if quote["author"] else "")
# quotes view
@snb.message_handler(commands=["quotes"])
def quotes_handler(msg, page=0, prev_msg=None, user=None):
if not user:
user = msg.from_user.id
if not db.check_user(user):
log.info(f"unauthorized access from user {user}")
snb.send_message(msg.chat.id, "Я не понял, ты вообще че за кернел??")
return
quotes_count = db.quotes_count()
if quotes_count == 0:
snb.send_message(msg.chat.id, "А где цитаты-то?")
return
page_size = int(conf["Output"]["QuotesPerPage"])
pages_count = quotes_count // page_size + bool(quotes_count % page_size)
prev_page = (page-1) % pages_count
next_page = (page+1) % pages_count
buttons = InlineKeyboardMarkup()
prev_button = InlineKeyboardButton("", callback_data=f"quotes:{prev_page}")
curr_button = InlineKeyboardButton(f"{page*page_size+1}-{min(quotes_count,(page+1)*page_size)}/{quotes_count}", callback_data=":")
next_button = InlineKeyboardButton("", callback_data=f"quotes:{next_page}")
buttons.add(prev_button, curr_button, next_button)
snb.send_message(msg.chat.id, "\n\n".join(list(map(format_quote, db.get_quotes(page_size*page, page_size)))), parse_mode="markdown", reply_markup=buttons)
if prev_msg:
snb.delete_message(msg.chat.id, prev_msg.id)
# search quotes by keywords
@snb.message_handler(commands=["search"])
def search_quotes(msg):
pass
# get random quote
@snb.message_handler(commands=["random"])
def random_quote(msg):
if not db.check_user(msg.from_user.id):
log.info(f"unauthorized access from user {msg.from_user.id}")
snb.send_message(msg.chat.id, "Я не понял, ты вообще кто такой?")
return
quote = db.get_random_quote()
if quote:
snb.send_message(msg.chat.id, format_quote(quote), parse_mode="markdown")
return
snb.send_message(msg.chat.id, "А где цитаты-то?")
# add new quote
@snb.message_handler(content_types=["text"])
def text_handler(msg):
if not db.check_user(msg.from_user.id):
log.info(f"unauthorized access from user {msg.from_user.id}")
snb.send_message(msg.chat.id, "Я не понял, ты вообще че за кернел??")
return
try:
qid = db.add_quote(msg)
log.info(f"quote '{qid}' added")
snb.reply_to(msg, "Цитата добавлена!")
quote = db.get_quote(qid)
for user in db.get_users():
if user["telegram_id"] != msg.from_user.id:
snb.send_message(user["telegram_id"], "Пользователь %s только что добавил новую цитату:\n```\n%s\n%s```" % (quote["creator_name"], quote["text"], f"\n© {quote['author']}" if quote['author'] else ""), parse_mode="markdown")
log.info(f"notified user '{user['telegram_id']}' about quote '{qid}'")
except RuntimeError:
log.info(f"attempt to add quote from non-editor {msg.from_user.id}")
snb.reply_to(msg, "Сорямбус, бро, но твои права не скачаны. Со всеми вопросами - к разрабам.")
# callbacks
@snb.callback_query_handler(lambda c: True)
def callback_handler(c):
call = c.data.split(':')
if call[0] == "quotes":
quotes_handler(c.message, page=int(call[1]), prev_msg=c.message, user=c.from_user.id)
if __name__ == '__main__':
log.info("snb started")
defer("log.info(\"snb stopped\")")
while 1:
try:
snb.polling()
except Exception as e:
log.error("polling stopped: " + str(e))
sleep(1)