"""Common info needed in both command and callback handlers"""
from telegram import (
Bot,
CallbackQuery,
Chat,
InlineKeyboardMarkup,
LinkPreviewOptions,
Message,
MessageOriginChannel,
MessageOriginChat,
MessageOriginUser,
Update,
)
from telegram.error import BadRequest
from telegram.ext import CallbackContext
from spotted.data import Config, PendingPost, PublishedPost, User
from spotted.debug.log_manager import logger
from spotted.utils.keyboard_util import (
get_approve_kb,
get_post_outcome_kb,
get_published_post_kb,
)
[docs]
class EventInfo: # pylint: disable=too-many-public-methods
"""Class that contains all the relevant information related to an event"""
def __init__(
self,
bot: Bot,
ctx: CallbackContext,
update: Update = None,
message: Message = None,
query: CallbackQuery = None,
):
self.__bot = bot
self.__ctx = ctx
self.__update = update
self.__message = message
self.__query = query
@property
def bot(self) -> Bot:
"""Instance of the telegram bot"""
return self.__bot
@property
def context(self) -> CallbackContext:
"""Context generated by some event"""
return self.__ctx
@property
def update(self) -> Update:
"""Update generated by some event"""
return self.__update
@property
def message(self) -> Message:
"""Message that caused the update"""
return self.__message
@property
def bot_data(self) -> dict:
"""Data related to the bot. Is not persistent between restarts"""
return self.__ctx.bot_data
@property
def user_data(self) -> dict:
"""Data related to the user. Is not persistent between restarts"""
return self.__ctx.user_data
@property
def chat_id(self) -> int:
"""Id of the chat where the event happened"""
if self.__message is None:
return None
return self.__message.chat_id
@property
def chat_type(self) -> str:
"""Type of the chat where the event happened"""
if self.__message is None:
return None
return self.__message.chat.type
@property
def is_private_chat(self) -> bool:
"""Whether the chat is private or not"""
if self.chat_type is None:
return None
return self.chat_type == Chat.PRIVATE
@property
def text(self) -> str:
"""Text of the message that caused the update"""
if self.__message is None:
return None
return self.__message.text
@property
def callback_key(self) -> str:
"""Return the args of the message that caused the update.
If the update was caused by a callback, the callback data is splitted by ',' and returned"""
if self.__query is None or self.__query.data is None:
return ""
return self.__query.data.split(",")[0]
@property
def args(self) -> list[str]:
"""Return the args of the message that caused the update.
If the update was caused by a callback, the callback data is splitted by ',' and returned"""
# if the update was caused by a callback, the callback data is splitted by ',' and returned
if self.__query is not None and self.__query.data is not None:
args = self.__query.data.split(",")
if len(args) > 1:
return args[1:]
return []
# if the update was caused by a command, use the built-in args
if self.__ctx.args is not None:
return self.__ctx.args
# if the update was caused by a text message, split the text and take all the words but the first one
if self.text is not None:
words = self.text.split(" ")
return words[1:] if len(words) > 1 else []
return []
@property
def message_id(self) -> int:
"""Id of the message that caused the update"""
if self.__message is None:
return None
return self.__message.message_id
@property
def is_valid_message_type(self) -> bool:
"""Whether or not the type of the message is supported"""
if self.__message is None:
return False
return bool(
self.__message.text
or self.__message.photo
or self.__message.voice
or self.__message.audio
or self.__message.video
or self.__message.animation
or self.__message.sticker
or self.__message.poll
)
@property
def reply_markup(self) -> InlineKeyboardMarkup:
"""Reply_markup of the message that caused the update"""
if self.__message is None:
return None
return self.__message.reply_markup
@property
def user_id(self) -> int:
"""Id of the user that caused the update"""
if self.__query is not None:
return self.__query.from_user.id
if self.__message is not None:
return self.__message.from_user.id
return None
@property
def user_username(self) -> str:
"""Username of the user that caused the update"""
if self.__query is not None:
return self.__query.from_user.username
if self.__message is not None:
return self.__message.from_user.username
return None
@property
def user_name(self) -> str:
"""Name of the user that caused the update"""
if self.__query is not None:
return self.__query.from_user.name
if self.__message is not None:
return self.__message.from_user.name
return None
@property
def inline_keyboard(self) -> InlineKeyboardMarkup:
"""InlineKeyboard attached to the message"""
if self.__message is None:
return None
return self.__message.reply_markup
@property
def query_id(self) -> str:
"""Id of the query that caused the update"""
if self.__query is None:
return None
return self.__query.id
@property
def query_data(self) -> str:
"""Data associated with the query that caused the update"""
if self.__query is None:
return None
return self.__query.data
@property
def forward_from_id(self) -> int:
"""Id of the original message that has been forwarded"""
if self.__message is None:
return None
if isinstance(self.__message.forward_origin, MessageOriginChannel):
return self.__message.forward_origin.message_id
return None
@property
def forward_from_chat_id(self) -> int:
"""Id of the original chat the message has been forwarded from"""
if self.__message is None:
return None
if isinstance(self.__message.forward_origin, MessageOriginChannel):
return self.__message.forward_origin.chat.id
if isinstance(self.__message.forward_origin, MessageOriginChat):
return self.__message.forward_origin.sender_chat.id
if isinstance(self.__message.forward_origin, MessageOriginUser):
return self.__message.forward_origin.sender_user.id
return None
@property
def is_forward_from_channel(self) -> bool:
"""Whether the message has been forwarded from a channel"""
return isinstance(self.__message.forward_origin, MessageOriginChannel)
@property
def is_forward_from_chat(self) -> bool:
"""Whether the message has been forwarded from a chat"""
return isinstance(self.__message.forward_origin, MessageOriginChat)
@property
def is_forward_from_user(self) -> bool:
"""Whether the message has been forwarded from a user"""
return isinstance(self.__message.forward_origin, MessageOriginUser)
@property
def is_forwarded_post(self) -> bool:
"""Whether the message is in fact a forwarded post from the channel to the group"""
return (
self.chat_id == Config.post_get("community_group_id")
and isinstance(self.__message.forward_origin, MessageOriginChannel)
and self.__message.forward_origin.chat.id == Config.post_get("channel_id")
and self.__message.is_automatic_forward
)
[docs]
@classmethod
def from_message(cls, update: Update, ctx: CallbackContext) -> "EventInfo":
"""Instance of EventInfo created by a message update
Args:
update: update event
context: context passed by the handler
Returns:
instance of the class
"""
message = update.message if update.message is not None else update.edited_message
return cls(bot=ctx.bot, ctx=ctx, update=update, message=message)
[docs]
@classmethod
def from_callback(cls, update: Update, ctx: CallbackContext) -> "EventInfo":
"""Instance of EventInfo created by a callback update
Args:
update: update event
context: context passed by the handler
Returns:
instance of the class
"""
return cls(
bot=ctx.bot, ctx=ctx, update=update, message=update.callback_query.message, query=update.callback_query
)
[docs]
@classmethod
def from_job(cls, ctx: CallbackContext) -> "EventInfo":
"""Instance of EventInfo created by a job update
Args:
context: context passed by the handler
Returns:
instance of the class
"""
return cls(bot=ctx.bot, ctx=ctx)
[docs]
async def answer_callback_query(self, text: str = None):
"""Calls the answer_callback_query method of the bot class, while also handling the exception
Args:
text: Text to show to the user
"""
try:
await self.__bot.answer_callback_query(callback_query_id=self.query_id, text=text)
except BadRequest as ex:
logger.warning("On answer_callback_query: %s", ex)
[docs]
async def edit_inline_keyboard(
self, chat_id: int = None, message_id: int = None, new_keyboard: InlineKeyboardMarkup = None
):
"""Generic wrapper used to edit the inline keyboard of a message with the telegram bot,
while also handling the exception
Args:
chat_id: id of the chat the message to edit belongs to or the current chat if None
message_id: id of the message to edit. It is the current message if left None
new_keyboard: new inline keyboard to assign to the message
"""
chat_id = chat_id if chat_id is not None else self.chat_id
message_id = message_id if message_id is not None else self.message_id
try:
await self.__bot.edit_message_reply_markup(
chat_id=chat_id, message_id=message_id, reply_markup=new_keyboard
)
except BadRequest as ex:
logger.error("EventInfo.edit_inline_keyboard: %s", ex)
[docs]
async def send_post_to_admins(self) -> bool:
"""Sends the post to the admin group, so it can be approved
Returns:
whether or not the operation was successful
"""
message = self.__message.reply_to_message
admin_group_id = Config.post_get("admin_group_id")
poll = message.poll # if the message is a poll, get its reference
try:
if poll: # makes sure the poll is anonym
g_message = await self.__bot.send_poll(
chat_id=admin_group_id,
question=poll.question,
options=[option.text for option in poll.options],
type=poll.type,
allows_multiple_answers=poll.allows_multiple_answers,
correct_option_id=poll.correct_option_id,
reply_markup=get_approve_kb(),
)
elif message.text and message.entities: # maintains the previews, if present
show_preview = self.user_data.get("show_preview", True)
g_message = await self.__bot.send_message(
chat_id=admin_group_id,
text=message.text,
reply_markup=get_approve_kb(),
entities=message.entities,
link_preview_options=LinkPreviewOptions(not show_preview),
)
else:
g_message = await self.__bot.copy_message(
chat_id=admin_group_id,
from_chat_id=message.chat_id,
message_id=message.message_id,
reply_markup=get_approve_kb(),
)
except BadRequest as ex:
logger.error("Sending the post on send_post_to: %s", ex)
return False
PendingPost.create(user_message=message, admin_group_id=admin_group_id, g_message_id=g_message.message_id)
return True
[docs]
async def send_post_to_channel(self, user_id: int):
"""Sends the post to the channel, so it can be enjoyed by the users (and voted, if comments are disabled)"""
message = self.__message
channel_id = Config.post_get("channel_id")
poll = message.poll # if the message is a poll, get its reference
reply_markup = None
# ... append the voting Inline Keyboard, if comments are not to be supported
if not Config.post_get("comments"):
reply_markup = get_published_post_kb()
if poll: # makes sure the poll is anonym
c_message = await self.__bot.send_poll(
chat_id=channel_id,
question=poll.question,
options=[option.text for option in poll.options],
type=poll.type,
allows_multiple_answers=poll.allows_multiple_answers,
correct_option_id=poll.correct_option_id,
reply_markup=reply_markup,
)
else:
c_message = await self.__bot.copy_message(
chat_id=channel_id,
from_chat_id=message.chat_id,
message_id=message.message_id,
reply_markup=reply_markup,
)
if not Config.post_get("comments"): # if the user can vote directly on the post
PublishedPost.create(c_message_id=c_message.message_id, channel_id=channel_id)
else: # ... else, if comments are enabled, save the user_id, so the user can be credited
self.bot_data[f"{channel_id},{c_message.message_id}"] = user_id
[docs]
async def send_post_to_channel_group(self):
"""Sends the post to the group associated to the channel,
so that users can vote the post (if comments are enabled)
"""
message = self.__message
community_group_id = Config.post_get("community_group_id")
user_id = self.bot_data.pop(f"{self.forward_from_chat_id},{self.forward_from_id}", -1)
sign = await User(user_id).get_user_sign(bot=self.__bot)
post_message = await self.__bot.send_message(
chat_id=community_group_id,
text=f"by: {sign}",
reply_markup=get_published_post_kb(),
reply_to_message_id=message.message_id,
)
PublishedPost.create(channel_id=community_group_id, c_message_id=post_message.message_id)
[docs]
async def show_admins_votes(self, pending_post: PendingPost, reason: str | None = None):
"""After a post is been approved or rejected, shows the admins that approved or rejected it \
and edit the message to show the admin's votes
Args:
pending_post: post to show the admin's votes for
reason: reason for the rejection, currently used on autoreply
"""
inline_keyboard = await get_post_outcome_kb(
bot=self.__bot, votes=pending_post.get_list_admin_votes(), reason=reason
)
await self.__bot.edit_message_reply_markup(
chat_id=pending_post.admin_group_id, message_id=pending_post.g_message_id, reply_markup=inline_keyboard
)
remaining_pending_posts = PendingPost.get_all(admin_group_id=pending_post.admin_group_id)
# remove the post from the pending posts
remaining_pending_posts = [
post for post in remaining_pending_posts if post.g_message_id != pending_post.g_message_id
]
remaining_pending_posts.sort(key=lambda post: post.g_message_id)
remaining_pending_posts_count = len(remaining_pending_posts)
# if there are pending post, reply to the oldest one with the number of remaining pending posts
if remaining_pending_posts_count > 0:
text = f"⬆️ Post in attesa\nRimangono {remaining_pending_posts_count} post in attesa"
oldest_pending_post = remaining_pending_posts[0]
await self.__bot.send_message(
chat_id=pending_post.admin_group_id, text=text, reply_to_message_id=oldest_pending_post.g_message_id
)