Source code for spotted.utils.info_util

"""Common info needed in both command and callback handlers"""
from telegram import Bot, CallbackQuery, Chat, InlineKeyboardMarkup, Message, Update
from telegram.error import BadRequest
from telegram.ext import CallbackContext

from spotted.data import Config, PendingPost, PublishedPost, User
from spotted.debug.log_manager import logger
from spotted.utils.keyboard_util import (
    get_approve_kb,
    get_post_outcome_kb,
    get_published_post_kb,
)


[docs] class EventInfo: # pylint: disable=too-many-public-methods """Class that contains all the relevant information related to an event""" def __init__( self, bot: Bot, ctx: CallbackContext, update: Update = None, message: Message = None, query: CallbackQuery = None, ): self.__bot = bot self.__ctx = ctx self.__update = update self.__message = message self.__query = query @property def bot(self) -> Bot: """Instance of the telegram bot""" return self.__bot @property def context(self) -> CallbackContext: """Context generated by some event""" return self.__ctx @property def update(self) -> Update: """Update generated by some event""" return self.__update @property def message(self) -> Message: """Message that caused the update""" return self.__message @property def bot_data(self) -> dict: """Data related to the bot. Is not persistent between restarts""" return self.__ctx.bot_data @property def user_data(self) -> dict: """Data related to the user. Is not persistent between restarts""" return self.__ctx.user_data @property def chat_id(self) -> int: """Id of the chat where the event happened""" if self.__message is None: return None return self.__message.chat_id @property def chat_type(self) -> str: """Type of the chat where the event happened""" if self.__message is None: return None return self.__message.chat.type @property def is_private_chat(self) -> bool: """Whether the chat is private or not""" if self.chat_type is None: return None return self.chat_type == Chat.PRIVATE @property def text(self) -> str: """Text of the message that caused the update""" if self.__message is None: return None return self.__message.text @property def callback_key(self) -> str: """Return the args of the message that caused the update. If the update was caused by a callback, the callback data is splitted by ',' and returned""" if self.__query is None or self.__query.data is None: return "" return self.__query.data.split(",")[0] @property def args(self) -> list[str]: """Return the args of the message that caused the update. If the update was caused by a callback, the callback data is splitted by ',' and returned""" # if the update was caused by a callback, the callback data is splitted by ',' and returned if self.__query is not None and self.__query.data is not None: args = self.__query.data.split(",") if len(args) > 1: return args[1:] return [] # if the update was caused by a command, use the built-in args if self.__ctx.args is not None: return self.__ctx.args # if the update was caused by a text message, split the text and take all the words but the first one if self.text is not None: words = self.text.split(" ") return words[1:] if len(words) > 1 else [] return [] @property def message_id(self) -> int: """Id of the message that caused the update""" if self.__message is None: return None return self.__message.message_id @property def is_valid_message_type(self) -> bool: """Whether or not the type of the message is supported""" if self.__message is None: return False return bool( self.__message.text or self.__message.photo or self.__message.voice or self.__message.audio or self.__message.video or self.__message.animation or self.__message.sticker or self.__message.poll ) @property def reply_markup(self) -> InlineKeyboardMarkup: """Reply_markup of the message that caused the update""" if self.__message is None: return None return self.__message.reply_markup @property def user_id(self) -> int: """Id of the user that caused the update""" if self.__query is not None: return self.__query.from_user.id if self.__message is not None: return self.__message.from_user.id return None @property def user_username(self) -> str: """Username of the user that caused the update""" if self.__query is not None: return self.__query.from_user.username if self.__message is not None: return self.__message.from_user.username return None @property def user_name(self) -> str: """Name of the user that caused the update""" if self.__query is not None: return self.__query.from_user.name if self.__message is not None: return self.__message.from_user.name return None @property def inline_keyboard(self) -> InlineKeyboardMarkup: """InlineKeyboard attached to the message""" if self.__message is None: return None return self.__message.reply_markup @property def query_id(self) -> str: """Id of the query that caused the update""" if self.__query is None: return None return self.__query.id @property def query_data(self) -> str: """Data associated with the query that caused the update""" if self.__query is None: return None return self.__query.data @property def forward_from_id(self) -> int: """Id of the original message that has been forwarded""" if self.__message is None: return None return self.__message.forward_from_message_id @property def forward_from_chat_id(self) -> int: """Id of the original chat the message has been forwarded from""" if self.__message is None or self.__message.forward_from_chat is None: return None return self.__message.forward_from_chat.id @property def is_forwarded_post(self) -> bool: """Whether the message is in fact a forwarded post from the channel to the group""" return self.chat_id == Config.post_get("community_group_id") and self.forward_from_chat_id == Config.post_get( "channel_id" )
[docs] @classmethod def from_message(cls, update: Update, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a message update Args: update: update event context: context passed by the handler Returns: instance of the class """ message = update.message if update.message is not None else update.edited_message return cls(bot=ctx.bot, ctx=ctx, update=update, message=message)
[docs] @classmethod def from_callback(cls, update: Update, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a callback update Args: update: update event context: context passed by the handler Returns: instance of the class """ return cls( bot=ctx.bot, ctx=ctx, update=update, message=update.callback_query.message, query=update.callback_query )
[docs] @classmethod def from_job(cls, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a job update Args: context: context passed by the handler Returns: instance of the class """ return cls(bot=ctx.bot, ctx=ctx)
[docs] async def answer_callback_query(self, text: str = None): """Calls the answer_callback_query method of the bot class, while also handling the exception Args: text: Text to show to the user """ try: await self.__bot.answer_callback_query(callback_query_id=self.query_id, text=text) except BadRequest as ex: logger.warning("On answer_callback_query: %s", ex)
[docs] async def edit_inline_keyboard( self, chat_id: int = None, message_id: int = None, new_keyboard: InlineKeyboardMarkup = None ): """Generic wrapper used to edit the inline keyboard of a message with the telegram bot, while also handling the exception Args: chat_id: id of the chat the message to edit belongs to or the current chat if None message_id: id of the message to edit. It is the current message if left None new_keyboard: new inline keyboard to assign to the message """ chat_id = chat_id if chat_id is not None else self.chat_id message_id = message_id if message_id is not None else self.message_id try: await self.__bot.edit_message_reply_markup( chat_id=chat_id, message_id=message_id, reply_markup=new_keyboard ) except BadRequest as ex: logger.error("EventInfo.edit_inline_keyboard: %s", ex)
[docs] async def send_post_to_admins(self) -> bool: """Sends the post to the admin group, so it can be approved Returns: whether or not the operation was successful """ message = self.__message.reply_to_message admin_group_id = Config.post_get("admin_group_id") poll = message.poll # if the message is a poll, get its reference try: if poll: # makes sure the poll is anonym g_message = await self.__bot.send_poll( chat_id=admin_group_id, question=poll.question, options=[option.text for option in poll.options], type=poll.type, allows_multiple_answers=poll.allows_multiple_answers, correct_option_id=poll.correct_option_id, reply_markup=get_approve_kb(), ) elif message.text and message.entities: # maintains the previews, if present show_preview = self.user_data.get("show_preview", True) g_message = await self.__bot.send_message( chat_id=admin_group_id, text=message.text, reply_markup=get_approve_kb(), entities=message.entities, disable_web_page_preview=not show_preview, ) else: g_message = await self.__bot.copy_message( chat_id=admin_group_id, from_chat_id=message.chat_id, message_id=message.message_id, reply_markup=get_approve_kb(), ) except BadRequest as ex: logger.error("Sending the post on send_post_to: %s", ex) return False PendingPost.create(user_message=message, admin_group_id=admin_group_id, g_message_id=g_message.message_id) return True
[docs] async def send_post_to_channel(self, user_id: int): """Sends the post to the channel, so it can be enjoyed by the users (and voted, if comments are disabled)""" message = self.__message channel_id = Config.post_get("channel_id") poll = message.poll # if the message is a poll, get its reference reply_markup = None # ... append the voting Inline Keyboard, if comments are not to be supported if not Config.post_get("comments"): reply_markup = get_published_post_kb() if poll: # makes sure the poll is anonym c_message = await self.__bot.send_poll( chat_id=channel_id, question=poll.question, options=[option.text for option in poll.options], type=poll.type, allows_multiple_answers=poll.allows_multiple_answers, correct_option_id=poll.correct_option_id, reply_markup=reply_markup, ) else: c_message = await self.__bot.copy_message( chat_id=channel_id, from_chat_id=message.chat_id, message_id=message.message_id, reply_markup=reply_markup, ) if not Config.post_get("comments"): # if the user can vote directly on the post PublishedPost.create(c_message_id=c_message.message_id, channel_id=channel_id) else: # ... else, if comments are enabled, save the user_id, so the user can be credited self.bot_data[f"{channel_id},{c_message.message_id}"] = user_id
[docs] async def send_post_to_channel_group(self): """Sends the post to the group associated to the channel, so that users can vote the post (if comments are enabled) """ message = self.__message community_group_id = Config.post_get("community_group_id") user_id = self.bot_data.pop(f"{self.forward_from_chat_id},{self.forward_from_id}", -1) sign = await User(user_id).get_user_sign(bot=self.__bot) post_message = await self.__bot.send_message( chat_id=community_group_id, text=f"by: {sign}", reply_markup=get_published_post_kb(), reply_to_message_id=message.message_id, ) PublishedPost.create(channel_id=community_group_id, c_message_id=post_message.message_id)
[docs] async def show_admins_votes(self, pending_post: PendingPost, reason: str | None = None): """After a post is been approved or rejected, shows the admins that approved or rejected it \ and edit the message to show the admin's votes Args: pending_post: post to show the admin's votes for reason: reason for the rejection, currently used on autoreply """ inline_keyboard = await get_post_outcome_kb( bot=self.__bot, votes=pending_post.get_list_admin_votes(), reason=reason ) await self.__bot.edit_message_reply_markup( chat_id=pending_post.admin_group_id, message_id=pending_post.g_message_id, reply_markup=inline_keyboard ) remaining_pending_posts = PendingPost.get_all(admin_group_id=pending_post.admin_group_id) # remove the post from the pending posts remaining_pending_posts = [ post for post in remaining_pending_posts if post.g_message_id != pending_post.g_message_id ] remaining_pending_posts.sort(key=lambda post: post.g_message_id) remaining_pending_posts_count = len(remaining_pending_posts) # if there are pending post, reply to the oldest one with the number of remaining pending posts if remaining_pending_posts_count > 0: text = f"⬆️ Post in attesa\nRimangono {remaining_pending_posts_count} post in attesa" oldest_pending_post = remaining_pending_posts[0] await self.__bot.send_message( chat_id=pending_post.admin_group_id, text=text, reply_to_message_id=oldest_pending_post.g_message_id )