Source code for spotted.utils.info_util

"""Common info needed in both command and callback handlers"""

from telegram import (
    Bot,
    CallbackQuery,
    Chat,
    InlineKeyboardMarkup,
    LinkPreviewOptions,
    Message,
    MessageOriginChannel,
    MessageOriginChat,
    MessageOriginUser,
    Update,
)
from telegram.error import BadRequest
from telegram.ext import CallbackContext

from spotted.data import Config, PendingPost, PublishedPost, User
from spotted.debug.log_manager import logger
from spotted.utils.keyboard_util import (
    get_approve_kb,
    get_post_outcome_kb,
    get_published_post_kb,
)


[docs] class EventInfo: # pylint: disable=too-many-public-methods """Class that contains all the relevant information related to an event""" def __init__( self, bot: Bot, ctx: CallbackContext, update: Update = None, message: Message = None, query: CallbackQuery = None, ): self.__bot = bot self.__ctx = ctx self.__update = update self.__message = message self.__query = query @property def bot(self) -> Bot: """Instance of the telegram bot""" return self.__bot @property def context(self) -> CallbackContext: """Context generated by some event""" return self.__ctx @property def update(self) -> Update: """Update generated by some event""" return self.__update @property def message(self) -> Message: """Message that caused the update""" return self.__message @property def bot_data(self) -> dict: """Data related to the bot. Is not persistent between restarts""" return self.__ctx.bot_data @property def user_data(self) -> dict: """Data related to the user. Is not persistent between restarts""" return self.__ctx.user_data @property def chat_id(self) -> int: """Id of the chat where the event happened""" if self.__message is None: return None return self.__message.chat_id @property def chat_type(self) -> str: """Type of the chat where the event happened""" if self.__message is None: return None return self.__message.chat.type @property def is_private_chat(self) -> bool: """Whether the chat is private or not""" if self.chat_type is None: return None return self.chat_type == Chat.PRIVATE @property def text(self) -> str: """Text of the message that caused the update""" if self.__message is None: return None return self.__message.text @property def callback_key(self) -> str: """Return the args of the message that caused the update. If the update was caused by a callback, the callback data is splitted by ',' and returned""" if self.__query is None or self.__query.data is None: return "" return self.__query.data.split(",")[0] @property def args(self) -> list[str]: """Return the args of the message that caused the update. If the update was caused by a callback, the callback data is splitted by ',' and returned""" # if the update was caused by a callback, the callback data is splitted by ',' and returned if self.__query is not None and self.__query.data is not None: args = self.__query.data.split(",") if len(args) > 1: return args[1:] return [] # if the update was caused by a command, use the built-in args if self.__ctx.args is not None: return self.__ctx.args # if the update was caused by a text message, split the text and take all the words but the first one if self.text is not None: words = self.text.split(" ") return words[1:] if len(words) > 1 else [] return [] @property def message_id(self) -> int: """Id of the message that caused the update""" if self.__message is None: return None return self.__message.message_id @property def is_valid_message_type(self) -> bool: """Whether or not the type of the message is supported""" if self.__message is None: return False return bool( self.__message.text or self.__message.photo or self.__message.voice or self.__message.audio or self.__message.video or self.__message.animation or self.__message.sticker or self.__message.poll ) @property def reply_markup(self) -> InlineKeyboardMarkup: """Reply_markup of the message that caused the update""" if self.__message is None: return None return self.__message.reply_markup @property def user_id(self) -> int: """Id of the user that caused the update""" if self.__query is not None: return self.__query.from_user.id if self.__message is not None: return self.__message.from_user.id return None @property def user_username(self) -> str: """Username of the user that caused the update""" if self.__query is not None: return self.__query.from_user.username if self.__message is not None: return self.__message.from_user.username return None @property def user_name(self) -> str: """Name of the user that caused the update""" if self.__query is not None: return self.__query.from_user.name if self.__message is not None: return self.__message.from_user.name return None @property def inline_keyboard(self) -> InlineKeyboardMarkup: """InlineKeyboard attached to the message""" if self.__message is None: return None return self.__message.reply_markup @property def query_id(self) -> str: """Id of the query that caused the update""" if self.__query is None: return None return self.__query.id @property def query_data(self) -> str: """Data associated with the query that caused the update""" if self.__query is None: return None return self.__query.data @property def forward_from_id(self) -> int: """Id of the original message that has been forwarded""" if self.__message is None: return None if isinstance(self.__message.forward_origin, MessageOriginChannel): return self.__message.forward_origin.message_id return None @property def forward_from_chat_id(self) -> int: """Id of the original chat the message has been forwarded from""" if self.__message is None: return None if isinstance(self.__message.forward_origin, MessageOriginChannel): return self.__message.forward_origin.chat.id if isinstance(self.__message.forward_origin, MessageOriginChat): return self.__message.forward_origin.sender_chat.id if isinstance(self.__message.forward_origin, MessageOriginUser): return self.__message.forward_origin.sender_user.id return None @property def is_forward_from_channel(self) -> bool: """Whether the message has been forwarded from a channel""" return isinstance(self.__message.forward_origin, MessageOriginChannel) @property def is_forward_from_chat(self) -> bool: """Whether the message has been forwarded from a chat""" return isinstance(self.__message.forward_origin, MessageOriginChat) @property def is_forward_from_user(self) -> bool: """Whether the message has been forwarded from a user""" return isinstance(self.__message.forward_origin, MessageOriginUser) @property def is_forwarded_post(self) -> bool: """Whether the message is in fact a forwarded post from the channel to the group""" return ( self.chat_id == Config.post_get("community_group_id") and isinstance(self.__message.forward_origin, MessageOriginChannel) and self.__message.forward_origin.chat.id == Config.post_get("channel_id") and self.__message.is_automatic_forward )
[docs] @classmethod def from_message(cls, update: Update, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a message update Args: update: update event context: context passed by the handler Returns: instance of the class """ message = update.message if update.message is not None else update.edited_message return cls(bot=ctx.bot, ctx=ctx, update=update, message=message)
[docs] @classmethod def from_callback(cls, update: Update, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a callback update Args: update: update event context: context passed by the handler Returns: instance of the class """ return cls( bot=ctx.bot, ctx=ctx, update=update, message=update.callback_query.message, query=update.callback_query )
[docs] @classmethod def from_job(cls, ctx: CallbackContext) -> "EventInfo": """Instance of EventInfo created by a job update Args: context: context passed by the handler Returns: instance of the class """ return cls(bot=ctx.bot, ctx=ctx)
[docs] async def answer_callback_query(self, text: str = None): """Calls the answer_callback_query method of the bot class, while also handling the exception Args: text: Text to show to the user """ try: await self.__bot.answer_callback_query(callback_query_id=self.query_id, text=text) except BadRequest as ex: logger.warning("On answer_callback_query: %s", ex)
[docs] async def edit_inline_keyboard( self, chat_id: int = None, message_id: int = None, new_keyboard: InlineKeyboardMarkup = None ): """Generic wrapper used to edit the inline keyboard of a message with the telegram bot, while also handling the exception Args: chat_id: id of the chat the message to edit belongs to or the current chat if None message_id: id of the message to edit. It is the current message if left None new_keyboard: new inline keyboard to assign to the message """ chat_id = chat_id if chat_id is not None else self.chat_id message_id = message_id if message_id is not None else self.message_id try: await self.__bot.edit_message_reply_markup( chat_id=chat_id, message_id=message_id, reply_markup=new_keyboard ) except BadRequest as ex: logger.error("EventInfo.edit_inline_keyboard: %s", ex)
[docs] async def send_post_to_admins(self) -> bool: """Sends the post to the admin group, so it can be approved Returns: whether or not the operation was successful """ message = self.__message.reply_to_message admin_group_id = Config.post_get("admin_group_id") poll = message.poll # if the message is a poll, get its reference try: if poll: # makes sure the poll is anonym g_message = await self.__bot.send_poll( chat_id=admin_group_id, question=poll.question, options=[option.text for option in poll.options], type=poll.type, allows_multiple_answers=poll.allows_multiple_answers, correct_option_id=poll.correct_option_id, reply_markup=get_approve_kb(), ) elif message.text and message.entities: # maintains the previews, if present show_preview = self.user_data.get("show_preview", True) g_message = await self.__bot.send_message( chat_id=admin_group_id, text=message.text, reply_markup=get_approve_kb(), entities=message.entities, link_preview_options=LinkPreviewOptions(not show_preview), ) else: g_message = await self.__bot.copy_message( chat_id=admin_group_id, from_chat_id=message.chat_id, message_id=message.message_id, reply_markup=get_approve_kb(), ) except BadRequest as ex: logger.error("Sending the post on send_post_to: %s", ex) return False PendingPost.create(user_message=message, admin_group_id=admin_group_id, g_message_id=g_message.message_id) return True
[docs] async def send_post_to_channel(self, user_id: int): """Sends the post to the channel, so it can be enjoyed by the users (and voted, if comments are disabled)""" message = self.__message channel_id = Config.post_get("channel_id") poll = message.poll # if the message is a poll, get its reference reply_markup = None # ... append the voting Inline Keyboard, if comments are not to be supported if not Config.post_get("comments"): reply_markup = get_published_post_kb() if poll: # makes sure the poll is anonym c_message = await self.__bot.send_poll( chat_id=channel_id, question=poll.question, options=[option.text for option in poll.options], type=poll.type, allows_multiple_answers=poll.allows_multiple_answers, correct_option_id=poll.correct_option_id, reply_markup=reply_markup, ) else: c_message = await self.__bot.copy_message( chat_id=channel_id, from_chat_id=message.chat_id, message_id=message.message_id, reply_markup=reply_markup, ) if not Config.post_get("comments"): # if the user can vote directly on the post PublishedPost.create(c_message_id=c_message.message_id, channel_id=channel_id) else: # ... else, if comments are enabled, save the user_id, so the user can be credited self.bot_data[f"{channel_id},{c_message.message_id}"] = user_id
[docs] async def send_post_to_channel_group(self): """Sends the post to the group associated to the channel, so that users can vote the post (if comments are enabled) """ message = self.__message community_group_id = Config.post_get("community_group_id") user_id = self.bot_data.pop(f"{self.forward_from_chat_id},{self.forward_from_id}", -1) sign = await User(user_id).get_user_sign(bot=self.__bot) post_message = await self.__bot.send_message( chat_id=community_group_id, text=f"by: {sign}", reply_markup=get_published_post_kb(), reply_to_message_id=message.message_id, ) PublishedPost.create(channel_id=community_group_id, c_message_id=post_message.message_id)
[docs] async def show_admins_votes(self, pending_post: PendingPost, reason: str | None = None): """After a post is been approved or rejected, shows the admins that approved or rejected it \ and edit the message to show the admin's votes Args: pending_post: post to show the admin's votes for reason: reason for the rejection, currently used on autoreply """ inline_keyboard = await get_post_outcome_kb( bot=self.__bot, votes=pending_post.get_list_admin_votes(), reason=reason ) await self.__bot.edit_message_reply_markup( chat_id=pending_post.admin_group_id, message_id=pending_post.g_message_id, reply_markup=inline_keyboard ) remaining_pending_posts = PendingPost.get_all(admin_group_id=pending_post.admin_group_id) # remove the post from the pending posts remaining_pending_posts = [ post for post in remaining_pending_posts if post.g_message_id != pending_post.g_message_id ] remaining_pending_posts.sort(key=lambda post: post.g_message_id) remaining_pending_posts_count = len(remaining_pending_posts) # if there are pending post, reply to the oldest one with the number of remaining pending posts if remaining_pending_posts_count > 0: text = f"⬆️ Post in attesa\nRimangono {remaining_pending_posts_count} post in attesa" oldest_pending_post = remaining_pending_posts[0] await self.__bot.send_message( chat_id=pending_post.admin_group_id, text=text, reply_to_message_id=oldest_pending_post.g_message_id )