You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
355 lines
11 KiB
355 lines
11 KiB
import logging
|
|
from typing import List, Union
|
|
|
|
import discord
|
|
from redbot.core import Config, commands
|
|
from redbot.core.bot import Red
|
|
from redbot.core.commands import Cog
|
|
|
|
log = logging.getLogger("red.fox_v3.reactrestrict")
|
|
|
|
|
|
class ReactRestrictCombo:
|
|
def __init__(self, message_id, role_id):
|
|
self.message_id = message_id
|
|
self.role_id = role_id
|
|
|
|
def __eq__(self, other: "ReactRestrictCombo"):
|
|
return self.message_id == other.message_id and self.role_id == other.role_id
|
|
|
|
def to_json(self):
|
|
return {"message_id": self.message_id, "role_id": self.role_id}
|
|
|
|
@classmethod
|
|
def from_json(cls, data):
|
|
return cls(data["message_id"], data["role_id"])
|
|
|
|
|
|
class ReactRestrict(Cog):
|
|
"""
|
|
Prevent specific roles from reacting to specific messages
|
|
"""
|
|
|
|
def __init__(self, red: Red):
|
|
super().__init__()
|
|
self.bot = red
|
|
self.config = Config.get_conf(
|
|
self, 8210197991168210111511611410599116, force_registration=True
|
|
)
|
|
self.config.register_global(registered_combos=[])
|
|
|
|
async def red_delete_data_for_user(self, **kwargs):
|
|
"""Nothing to delete"""
|
|
return
|
|
|
|
async def combo_list(self) -> List[ReactRestrictCombo]:
|
|
"""
|
|
Returns a list of reactrestrict combos.
|
|
|
|
:return:
|
|
"""
|
|
cmd = self.config.registered_combos()
|
|
|
|
return [ReactRestrictCombo.from_json(data) for data in await cmd]
|
|
|
|
async def set_combo_list(self, combo_list: List[ReactRestrictCombo]):
|
|
"""
|
|
Helper method to set the list of reactrestrict combos.
|
|
|
|
:param combo_list:
|
|
:return:
|
|
"""
|
|
raw = [combo.to_json() for combo in combo_list]
|
|
await self.config.registered_combos.set(raw)
|
|
|
|
async def is_registered(self, message_id: int) -> bool:
|
|
"""
|
|
Determines if a message ID has been registered.
|
|
|
|
:param message_id:
|
|
:return:
|
|
"""
|
|
return any(message_id == combo.message_id for combo in await self.combo_list())
|
|
|
|
async def add_reactrestrict(self, message_id: int, role: discord.Role):
|
|
"""
|
|
Adds a react|role combo.
|
|
"""
|
|
# is_custom = True
|
|
# if isinstance(emoji, str):
|
|
# is_custom = False
|
|
|
|
combo = ReactRestrictCombo(message_id, role.id)
|
|
|
|
current_combos = await self.combo_list()
|
|
|
|
if combo not in current_combos:
|
|
current_combos.append(combo)
|
|
await self.set_combo_list(current_combos)
|
|
|
|
async def remove_react(self, message_id: int, role: discord.Role):
|
|
"""
|
|
Removes a given reaction
|
|
|
|
:param message_id:
|
|
:param role:
|
|
:return:
|
|
"""
|
|
current_combos = await self.combo_list()
|
|
|
|
to_keep = [
|
|
c for c in current_combos if not (c.message_id == message_id and c.role_id == role.id)
|
|
]
|
|
|
|
if to_keep != current_combos:
|
|
await self.set_combo_list(to_keep)
|
|
|
|
async def has_reactrestrict_combo(self, message_id: int) -> (bool, List[ReactRestrictCombo]):
|
|
"""
|
|
Determines if there is an existing role combo for a given message
|
|
and emoji ID.
|
|
|
|
:param message_id:
|
|
:return:
|
|
"""
|
|
if not await self.is_registered(message_id):
|
|
return False, []
|
|
|
|
combos = await self.combo_list()
|
|
|
|
ret = [c for c in combos if c.message_id == message_id]
|
|
|
|
return len(ret) > 0, ret
|
|
|
|
def _get_member(self, channel_id: int, user_id: int) -> discord.Member:
|
|
"""
|
|
Tries to get a member with the given user ID from the guild that has
|
|
the given channel ID.
|
|
|
|
:param int channel_id:
|
|
:param int user_id:
|
|
:rtype:
|
|
discord.Member
|
|
:raises LookupError:
|
|
If no such channel or member can be found.
|
|
"""
|
|
channel = self.bot.get_channel(channel_id)
|
|
if channel is None:
|
|
raise LookupError("no channel found.")
|
|
try:
|
|
member = channel.guild.get_member(user_id)
|
|
except AttributeError as e:
|
|
raise LookupError("No member found.") from e
|
|
|
|
if member is None:
|
|
raise LookupError("No member found.")
|
|
|
|
return member
|
|
|
|
@staticmethod
|
|
def _get_role(guild: discord.Guild, role_id: int) -> discord.Role:
|
|
"""
|
|
Gets a role object from the given guild with the given ID.
|
|
|
|
:param discord.Guild guild:
|
|
:param int role_id:
|
|
:rtype:
|
|
discord.Role
|
|
:raises LookupError:
|
|
If no such role exists.
|
|
"""
|
|
role = discord.utils.get(guild.roles, id=role_id)
|
|
|
|
if role is None:
|
|
raise LookupError("No role found.")
|
|
|
|
return role
|
|
|
|
async def _get_message_from_channel(
|
|
self, channel_id: int, message_id: int
|
|
) -> Union[discord.Message, None]:
|
|
"""
|
|
Tries to find a message by ID in the current guild context.
|
|
"""
|
|
channel = self.bot.get_channel(channel_id)
|
|
try:
|
|
return await channel.fetch_message(message_id)
|
|
except discord.NotFound:
|
|
pass
|
|
except AttributeError: # VoiceChannel object has no attribute 'get_message'
|
|
pass
|
|
|
|
return None
|
|
|
|
async def _get_message(
|
|
self, ctx: commands.Context, message_id: int
|
|
) -> Union[discord.Message, None]:
|
|
"""
|
|
Tries to find a message by ID in the current guild context.
|
|
|
|
:param ctx:
|
|
:param message_id:
|
|
:return:
|
|
"""
|
|
|
|
guild: discord.Guild = ctx.guild
|
|
for channel in guild.text_channels:
|
|
try:
|
|
return await channel.fetch_message(message_id)
|
|
except discord.NotFound:
|
|
pass
|
|
except AttributeError: # VoiceChannel object has no attribute 'get_message'
|
|
pass
|
|
except discord.Forbidden: # No access to channel, skip
|
|
pass
|
|
|
|
return None
|
|
|
|
@commands.group()
|
|
async def reactrestrict(self, ctx: commands.Context):
|
|
"""
|
|
Base command for this cog. Check help for the commands list.
|
|
"""
|
|
if ctx.invoked_subcommand is None:
|
|
pass
|
|
|
|
@reactrestrict.command()
|
|
async def add(self, ctx: commands.Context, message_id: int, *, role: discord.Role):
|
|
"""
|
|
Adds a reaction|role combination to a registered message, don't use quotes for the role name.
|
|
"""
|
|
message = await self._get_message(ctx, message_id)
|
|
if message is None:
|
|
await ctx.maybe_send_embed("That message doesn't seem to exist.")
|
|
return
|
|
|
|
# try:
|
|
# emoji, actual_emoji = await self._wait_for_emoji(ctx)
|
|
# except asyncio.TimeoutError:
|
|
# await ctx.send("You didn't respond in time, please redo this command.")
|
|
# return
|
|
#
|
|
# try:
|
|
# await message.add_reaction(actual_emoji)
|
|
# except discord.HTTPException:
|
|
# await ctx.send("I can't add that emoji because I'm not in the guild that"
|
|
# " owns it.")
|
|
# return
|
|
#
|
|
# noinspection PyTypeChecker
|
|
await self.add_reactrestrict(message_id, role)
|
|
|
|
await ctx.maybe_send_embed("Message|Role restriction added.")
|
|
|
|
@reactrestrict.command()
|
|
async def remove(self, ctx: commands.Context, message_id: int, role: discord.Role):
|
|
"""
|
|
Removes role associated with a given reaction.
|
|
"""
|
|
# try:
|
|
# emoji, actual_emoji = await self._wait_for_emoji(ctx)
|
|
# except asyncio.TimeoutError:
|
|
# await ctx.send("You didn't respond in time, please redo this command.")
|
|
# return
|
|
|
|
# noinspection PyTypeChecker
|
|
await self.remove_react(message_id, role)
|
|
|
|
await ctx.send("React restriction removed.")
|
|
|
|
@commands.Cog.listener()
|
|
async def on_raw_reaction_add(self, payload: discord.RawReactionActionEvent):
|
|
"""
|
|
Event handler for long term reaction watching.
|
|
"""
|
|
|
|
emoji = payload.emoji
|
|
message_id = payload.message_id
|
|
channel_id = payload.channel_id
|
|
user_id = payload.user_id
|
|
|
|
# if emoji.is_custom_emoji():
|
|
# emoji_id = emoji.id
|
|
# else:
|
|
# emoji_id = emoji.name
|
|
|
|
has_reactrestrict, combos = await self.has_reactrestrict_combo(message_id)
|
|
|
|
if not has_reactrestrict:
|
|
log.debug("Message not react restricted")
|
|
return
|
|
|
|
try:
|
|
member = self._get_member(channel_id, user_id)
|
|
except LookupError:
|
|
log.exception("Unable to get member from guild")
|
|
return
|
|
|
|
if member.bot:
|
|
log.debug("Won't remove reactions added by bots")
|
|
return
|
|
|
|
if await self.bot.cog_disabled_in_guild(self, member.guild):
|
|
return
|
|
|
|
try:
|
|
roles = [self._get_role(member.guild, c.role_id) for c in combos]
|
|
except LookupError:
|
|
log.exception("Couldn't get approved roles from combos")
|
|
return
|
|
|
|
for apprrole in roles:
|
|
if apprrole in member.roles:
|
|
log.debug("Has approved role")
|
|
return
|
|
|
|
message = await self._get_message_from_channel(channel_id, message_id)
|
|
try:
|
|
await message.remove_reaction(emoji, member)
|
|
except (discord.Forbidden, discord.NotFound, discord.HTTPException):
|
|
log.exception("Unable to remove reaction")
|
|
|
|
# try:
|
|
# await member.add_roles(*roles)
|
|
# except discord.Forbidden:
|
|
# pass
|
|
#
|
|
# async def on_raw_reaction_remove(self, emoji: discord.PartialReactionEmoji,
|
|
# message_id: int, channel_id: int, user_id: int):
|
|
# """
|
|
# Event handler for long term reaction watching.
|
|
#
|
|
# :param discord.PartialReactionEmoji emoji:
|
|
# :param int message_id:
|
|
# :param int channel_id:
|
|
# :param int user_id:
|
|
# :return:
|
|
# """
|
|
# if emoji.is_custom_emoji():
|
|
# emoji_id = emoji.id
|
|
# else:
|
|
# emoji_id = emoji.name
|
|
#
|
|
# has_reactrestrict, combos = await self.has_reactrestrict_combo(message_id, emoji_id)
|
|
#
|
|
# if not has_reactrestrict:
|
|
# return
|
|
#
|
|
# try:
|
|
# member = self._get_member(channel_id, user_id)
|
|
# except LookupError:
|
|
# return
|
|
#
|
|
# if member.bot:
|
|
# return
|
|
#
|
|
# try:
|
|
# roles = [self._get_role(member.guild, c.role_id) for c in combos]
|
|
# except LookupError:
|
|
# return
|
|
#
|
|
# try:
|
|
# await member.remove_roles(*roles)
|
|
# except discord.Forbidden:
|
|
# pass
|