Source code for spotted.handlers.job_handlers

"""Scheduled jobs of the bot"""

import io
import shutil
import sqlite3
from binascii import Error as BinasciiError
from datetime import datetime, timedelta, timezone

import pyzipper
from cryptography.fernet import Fernet
from telegram.error import BadRequest, Forbidden, TelegramError
from telegram.ext import CallbackContext

from spotted.data import Config, DbManager, PendingPost, User
from spotted.debug import logger
from spotted.utils import EventInfo


[docs] async def clean_pending_job(context: CallbackContext): """Job called each day at 05:00 utc. Automatically rejects all pending posts that are older than the chosen amount of hours Args: context: context passed by the jobqueue """ info = EventInfo.from_job(context) admin_group_id = Config.post_get("admin_group_id") before_time = datetime.now(tz=timezone.utc) - timedelta(hours=Config.post_get("remove_after_h")) pending_posts = PendingPost.get_all(admin_group_id=admin_group_id, before=before_time) # For each pending post older than before_time removed = 0 for pending_post in pending_posts: message_id = pending_post.g_message_id try: # deleting the message associated with the pending post to remote await info.bot.delete_message(chat_id=admin_group_id, message_id=message_id) removed += 1 try: # sending a notification to the user await info.bot.send_message( chat_id=pending_post.user_id, text="Gli admin erano sicuramente molto impegnati e non sono riusciti a valutare lo spot in tempo", ) except (BadRequest, Forbidden) as ex: logger.warning("Notifying the user on /clean_pending: %s", ex) except BadRequest as ex: logger.error("Deleting old pending message: %s", ex) finally: # delete the data associated with the pending post pending_post.delete_post() await info.bot.send_message( chat_id=admin_group_id, text=f"Sono stati eliminati {removed} messaggi rimasti in sospeso" )
[docs] def get_updated_backup_path() -> str: """Get the path of the database backup file, applying some transformations if needed. If `backup_keep_pending` is set to `False`, it creates a copy of the database file and drops the `pending_post` table from the copy, so the backup won't contain any pending post. Returns: path of the database backup file """ db_path = Config.debug_get("db_file") if Config.debug_get("backup_keep_pending"): return db_path # If we need to apply some transformations to the backup, # we create a copy of the database file and apply the transformations to the copy, # so we don't modify the original file # This may be simplified once we switch to python 3.11, # as the `sqlite3` module can read from a bytes stream backup_path = db_path + ".backup" shutil.copy(db_path, backup_path) # 1. drop the pending_post table from the backup database if not Config.debug_get("backup_keep_pending"): with sqlite3.connect(backup_path, detect_types=sqlite3.PARSE_DECLTYPES) as conn: cur = conn.cursor() cur.execute("DROP TABLE IF EXISTS pending_post") conn.commit() return backup_path
[docs] def get_zip_backup() -> bytes: """Zip the database file and return the bytes of the zip file, optionally encrypting it with a password if `crypto_key` is set in the settings. It is called if `zip_backup` is set to `True` in the settings. Returns: bytes of the (possibly encrypted) zip file """ db_path = get_updated_backup_path() zip_stream = io.BytesIO() with pyzipper.AESZipFile( zip_stream, "w", compression=pyzipper.ZIP_DEFLATED, encryption=pyzipper.WZ_AES if Config.debug_get("crypto_key") else None, ) as zf: if Config.debug_get("crypto_key"): zf.setpassword(Config.debug_get("crypto_key").encode("utf-8")) zf.write(db_path, arcname="spotted.sqlite3") zip_stream.seek(0) return zip_stream.read()
[docs] def get_backup() -> bytes: """Get the database backup, either encrypted or not. When the `crypto_key` setting is set, the backup is encrypted with Fernet, otherwise it's returned as is, in plaintext. Returns: bytes of the backup file, either encrypted or not """ path = get_updated_backup_path() with open(path, "rb") as database_file: if Config.debug_get("crypto_key"): cipher = Fernet(Config.debug_get("crypto_key")) return cipher.encrypt(database_file.read()) return database_file.read()
[docs] async def db_backup_job(context: CallbackContext): """Job called each day at 05:00 utc. Automatically upload and send last version of db for backup Args: context: context passed by the jobqueue """ admin_group_id = Config.post_get("admin_group_id") try: db_backup = get_zip_backup() if Config.debug_get("zip_backup") else get_backup() await context.bot.send_document( chat_id=Config.debug_get("backup_chat_id"), document=db_backup, filename="spotted.backup.zip" if Config.debug_get("zip_backup") else "spotted.backup.sqlite3", caption="✅ Backup effettuato con successo", ) if Config.debug_get("backup_chat_id") != admin_group_id: await context.bot.send_message(chat_id=admin_group_id, text="✅ Backup effettuato con successo") except BinasciiError as ex: await context.bot.send_message(chat_id=admin_group_id, text=f"✖️ Impossibile effettuare il backup\n\n{ex}") except TelegramError as ex: await context.bot.send_message(chat_id=admin_group_id, text=f"✖️ Impossibile inviare il backup\n\n{ex}")
[docs] async def clean_muted_users(context: CallbackContext): """Job called each day at 05:00 utc. Removed expired users mute records from the database Args: context: context passed by the jobqueue """ expired_muted = DbManager.select_from( table_name="muted_users", select="user_id", where="expire_date < DATETIME('now')" ) if len(expired_muted) == 0: return for user in expired_muted: user_id = user["user_id"] DbManager.delete_from(table_name="muted_users", where="user_id = %s", where_args=(user_id,)) await User(user_id).unmute(context.bot)