From 835ea3561739886050e63795df5b5b12cf032c9a Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 17 Sep 2020 17:10:02 -0400 Subject: [PATCH] Accept tick to skip, refactoring --- ccrole/ccrole.py | 352 +++++++++++++++++++++++++---------------------- 1 file changed, 189 insertions(+), 163 deletions(-) diff --git a/ccrole/ccrole.py b/ccrole/ccrole.py index eb654b1..04697af 100644 --- a/ccrole/ccrole.py +++ b/ccrole/ccrole.py @@ -12,16 +12,117 @@ from redbot.core.utils.mod import get_audit_reason log = logging.getLogger("red.fox_v3.ccrole") -async def _get_roles_from_content(ctx, content): +async def get_roles_from_content(ctx, content): content_list = content.split(",") try: - role_list = [ + role_id_list = [ discord.utils.get(ctx.guild.roles, name=role.strip(" ")).id for role in content_list ] except (discord.HTTPException, AttributeError): # None.id is attribute error return None else: - return role_list + return role_id_list + + +def transform_parameter(result, message, target): + """ + For security reasons only specific objects are allowed + Internals are ignored + Copied from redbot.cogs.customcom.CustomCommands.transform_parameter and added `target` + """ + raw_result = "{" + result + "}" + objects = { + "message": message, + "author": message.author, + "channel": message.channel, + "server": message.guild, + "guild": message.guild, + "target": target, + } + if result in objects: + return str(objects[result]) + try: + first, second = result.split(".") + except ValueError: + return raw_result + if first in objects and not second.startswith("_"): + first = objects[first] + else: + return raw_result + return str(getattr(first, second, raw_result)) + + +def format_cc(cmd, message, target): + out = cmd["text"] + results = re.findall("{([^}]+)}", out) + for result in results: + param = transform_parameter(result, message, target) + out = out.replace("{" + result + "}", param) + return out + + +async def eval_cc(cmd, message: discord.Message, ctx: commands.Context): + """Does all the work""" + if cmd["proles"] and not (set(role.id for role in message.author.roles) & set(cmd["proles"])): + log.debug(f"{message.author} missing required role to execute {ctx.invoked_with}") + return # Not authorized, do nothing + + if cmd["targeted"]: + view: StringView = ctx.view + view.skip_ws() + + # guild: discord.Guild = ctx.guild + # print(f"Guild: {guild}") + + target = view.get_quoted_word() + # print(f"Target: {target}") + + if target: + # target = discord.utils.get(guild.members, mention=target) + try: + target = await commands.MemberConverter().convert(ctx, target) + except commands.BadArgument: + target = None + else: + target = None + + if not target: + out_message = ( + f"This custom command is targeted! @mention a target\n`" + f"{ctx.invoked_with} `" + ) + await ctx.send(out_message) + return + else: + target = message.author + + reason = get_audit_reason(message.author) + + if cmd["aroles"]: + arole_list = [ + discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["aroles"] + ] + try: + await target.add_roles(*arole_list, reason=reason) + except discord.Forbidden: + log.exception(f"Permission error: Unable to add roles") + await ctx.send("Permission error: Unable to add roles") + + if cmd["rroles"]: + rrole_list = [ + discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["rroles"] + ] + try: + await target.remove_roles(*rrole_list, reason=reason) + except discord.Forbidden: + log.exception(f"Permission error: Unable to remove roles") + await ctx.send("Permission error: Unable to remove roles") + + if cmd["text"] is not None: + out_message = format_cc(cmd, message, target) + await ctx.send(out_message, allowed_mentions=discord.AllowedMentions()) + else: + await ctx.tick() class CCRole(commands.Cog): @@ -42,6 +143,52 @@ class CCRole(commands.Cog): """Nothing to delete""" return + async def _query_for_roles(self, ctx, message, timeout, check): + m: discord.Message = await ctx.send(message) + try: + await m.add_reaction("\N{WHITE HEAVY CHECK MARK}") + except discord.HTTPException: + log.exception("Unable to add reaction to ccrole setup message") + pass + + def reaction_check(payload): + return payload.message_id == m.id and payload.user_id == ctx.author.id + + tasks = [ + asyncio.ensure_future(self.bot.wait_for("message", timeout=timeout, check=check)), + asyncio.ensure_future( + self.bot.wait_for("raw_reaction_add", timeout=timeout, check=reaction_check) + ), + ] + + done, pending = await asyncio.wait( + tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED + ) + for task in pending: + task.cancel() + + if len(done) == 0: + return None, "Timed out, canceling" + + payload_or_message = done.pop().result() + + # try: + # answer = await self.bot.wait_for("message", timeout=timeout, check=check) + # except asyncio.TimeoutError: + # return None, "Timed out, canceling" + + role_list = [] + if ( + isinstance(payload_or_message, discord.Message) + and payload_or_message.content.upper() != "NONE" + ): + role_list = await get_roles_from_content(ctx, payload_or_message.content) + if role_list is None: + return None, "Invalid answer, canceling" + + # Either it was a reaction or None + return role_list, None + @commands.guild_only() @commands.group() async def ccrole(self, ctx: commands.Context): @@ -70,8 +217,6 @@ class CCRole(commands.Cog): return guild = ctx.guild - author = ctx.author - channel = ctx.channel cmd_list = self.config.guild(guild).cmdlist @@ -83,68 +228,49 @@ class CCRole(commands.Cog): ) return + def check(m): + return m.author == ctx.author and m.channel == ctx.channel + # Roles to add - await ctx.send( + arole_list, error_message = await self._query_for_roles( + ctx, "What roles should it add? (Must be **comma separated**)\n" - "Say `None` to skip adding roles" + "Say `None` or react with tick to skip adding roles", + 120, + check, ) - - def check(m): - return m.author == author and m.channel == channel - - try: - answer = await self.bot.wait_for("message", timeout=120, check=check) - except asyncio.TimeoutError: - await ctx.send("Timed out, canceling") + if arole_list is None: + await ctx.send(error_message) return - arole_list = [] - if answer.content.upper() != "NONE": - arole_list = await _get_roles_from_content(ctx, answer.content) - if arole_list is None: - await ctx.send("Invalid answer, canceling") - return - # Roles to remove - await ctx.send( + rrole_list, error_message = await self._query_for_roles( + ctx, "What roles should it remove? (Must be comma separated)\n" - "Say `None` to skip removing roles" + "Say `None` or react with tick to skip removing roles", + 120, + check, ) - try: - answer = await self.bot.wait_for("message", timeout=120, check=check) - except asyncio.TimeoutError: - await ctx.send("Timed out, canceling") + if rrole_list is None: + await ctx.send(error_message) return - rrole_list = [] - if answer.content.upper() != "NONE": - rrole_list = await _get_roles_from_content(ctx, answer.content) - if rrole_list is None: - await ctx.send("Invalid answer, canceling") - return - - # Roles to use - await ctx.send( + # Roles to allow use + prole_list, error_message = await self._query_for_roles( + ctx, "What roles are allowed to use this command? (Must be comma separated)\n" - "Say `None` to allow all roles" + "Say `None` or react with tick to allow all roles", + 120, + check, ) - - try: - answer = await self.bot.wait_for("message", timeout=120, check=check) - except asyncio.TimeoutError: - await ctx.send("Timed out, canceling") + if prole_list is None: + await ctx.send(error_message) return - prole_list = [] - if answer.content.upper() != "NONE": - prole_list = await _get_roles_from_content(ctx, answer.content) - if prole_list is None: - await ctx.send("Invalid answer, canceling") - return - # Selfrole - await ctx.send("Is this a targeted command?(yes/no)\n" - "No will make this a selfrole command") + await ctx.send( + "Is this a targeted command?(yes/no)\n" "No will make this a selfrole command" + ) try: answer = await self.bot.wait_for("message", timeout=120, check=check) @@ -152,12 +278,11 @@ class CCRole(commands.Cog): await ctx.send("Timed out, canceling") return - if answer.content.upper() in ["Y", "YES"]: - targeted = True - await ctx.send("This command will be **`targeted`**") - else: - targeted = False - await ctx.send("This command will be **`selfrole`**") + targeted = answer.content.upper() in ["Y", "YES"] + + await ctx.send( + "This command will be **`{}`**".format("targeted" if targeted else "selfrole") + ) # Message to send await ctx.send( @@ -245,9 +370,8 @@ class CCRole(commands.Cog): cmd_list = {k: v for k, v in cmd_list.items() if v} if not cmd_list: await ctx.send( - "There are no custom commands in this server. Use `{}ccrole add` to start adding some.".format( - ctx.prefix - ) + f"There are no custom commands in this server.\n" + f"Use `{ctx.prefix}ccrole add` to start adding some." ) return @@ -293,14 +417,16 @@ class CCRole(commands.Cog): cmd = ctx.invoked_with cmd = cmd.lower() # Continues the proud case_insentivity tradition of ccrole guild = ctx.guild - # message = ctx.message # Unneeded since switch to `on_message_without_command` from `on_command_error` + + # Unneeded since switch to `on_message_without_command` from `on_command_error` + # message = ctx.message cmdlist = self.config.guild(guild).cmdlist # cmd = message.content[len(prefix) :].split()[0].lower() cmd = await cmdlist.get_raw(cmd, default=None) if cmd is not None: - await self.eval_cc(cmd, message, ctx) + await eval_cc(cmd, message, ctx) else: log.debug(f"No custom command named {ctx.invoked_with} found") @@ -321,103 +447,3 @@ class CCRole(commands.Cog): if content.startswith(p): return p raise ValueError - - async def eval_cc(self, cmd, message: discord.Message, ctx: commands.Context): - """Does all the work""" - if cmd["proles"] and not ( - set(role.id for role in message.author.roles) & set(cmd["proles"]) - ): - log.debug(f"{message.author} missing required role to execute {ctx.invoked_with}") - return # Not authorized, do nothing - - if cmd["targeted"]: - view: StringView = ctx.view - view.skip_ws() - - guild: discord.Guild = ctx.guild - # print(f"Guild: {guild}") - - target = view.get_quoted_word() - # print(f"Target: {target}") - - if target: - # target = discord.utils.get(guild.members, mention=target) - try: - target = await commands.MemberConverter().convert(ctx, target) - except commands.BadArgument: - target = None - else: - target = None - - if not target: - out_message = ( - f"This custom command is targeted! @mention a target\n`" - f"{ctx.invoked_with} `" - ) - await ctx.send(out_message) - return - else: - target = message.author - - reason = get_audit_reason(message.author) - - if cmd["aroles"]: - arole_list = [ - discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["aroles"] - ] - try: - await target.add_roles(*arole_list, reason=reason) - except discord.Forbidden: - log.exception(f"Permission error: Unable to add roles") - await ctx.send("Permission error: Unable to add roles") - - if cmd["rroles"]: - rrole_list = [ - discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["rroles"] - ] - try: - await target.remove_roles(*rrole_list, reason=reason) - except discord.Forbidden: - log.exception(f"Permission error: Unable to remove roles") - await ctx.send("Permission error: Unable to remove roles") - - if cmd["text"] is not None: - out_message = self.format_cc(cmd, message, target) - await ctx.send(out_message, allowed_mentions=discord.AllowedMentions()) - else: - await ctx.tick() - - def format_cc(self, cmd, message, target): - out = cmd["text"] - results = re.findall("{([^}]+)\}", out) - for result in results: - param = self.transform_parameter(result, message, target) - out = out.replace("{" + result + "}", param) - return out - - def transform_parameter(self, result, message, target): - """ - For security reasons only specific objects are allowed - Internals are ignored - Copied from customcom.CustomCommands.transform_parameter and added `target` - """ - raw_result = "{" + result + "}" - objects = { - "message": message, - "author": message.author, - "channel": message.channel, - "server": message.guild, - "guild": message.guild, - "target": target, - } - if result in objects: - return str(objects[result]) - try: - first, second = result.split(".") - except ValueError: - return raw_result - if first in objects and not second.startswith("_"): - first = objects[first] - else: - return raw_result - return str(getattr(first, second, raw_result))