|
|
|
@ -12,16 +12,117 @@ from redbot.core.utils.mod import get_audit_reason
|
|
|
|
|
log = logging.getLogger("red.fox_v3.ccrole")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def _get_roles_from_content(ctx, content):
|
|
|
|
|
async def get_roles_from_content(ctx, content):
|
|
|
|
|
content_list = content.split(",")
|
|
|
|
|
try:
|
|
|
|
|
role_list = [
|
|
|
|
|
role_id_list = [
|
|
|
|
|
discord.utils.get(ctx.guild.roles, name=role.strip(" ")).id for role in content_list
|
|
|
|
|
]
|
|
|
|
|
except (discord.HTTPException, AttributeError): # None.id is attribute error
|
|
|
|
|
return None
|
|
|
|
|
else:
|
|
|
|
|
return role_list
|
|
|
|
|
return role_id_list
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def transform_parameter(result, message, target):
|
|
|
|
|
"""
|
|
|
|
|
For security reasons only specific objects are allowed
|
|
|
|
|
Internals are ignored
|
|
|
|
|
Copied from redbot.cogs.customcom.CustomCommands.transform_parameter and added `target`
|
|
|
|
|
"""
|
|
|
|
|
raw_result = "{" + result + "}"
|
|
|
|
|
objects = {
|
|
|
|
|
"message": message,
|
|
|
|
|
"author": message.author,
|
|
|
|
|
"channel": message.channel,
|
|
|
|
|
"server": message.guild,
|
|
|
|
|
"guild": message.guild,
|
|
|
|
|
"target": target,
|
|
|
|
|
}
|
|
|
|
|
if result in objects:
|
|
|
|
|
return str(objects[result])
|
|
|
|
|
try:
|
|
|
|
|
first, second = result.split(".")
|
|
|
|
|
except ValueError:
|
|
|
|
|
return raw_result
|
|
|
|
|
if first in objects and not second.startswith("_"):
|
|
|
|
|
first = objects[first]
|
|
|
|
|
else:
|
|
|
|
|
return raw_result
|
|
|
|
|
return str(getattr(first, second, raw_result))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def format_cc(cmd, message, target):
|
|
|
|
|
out = cmd["text"]
|
|
|
|
|
results = re.findall("{([^}]+)}", out)
|
|
|
|
|
for result in results:
|
|
|
|
|
param = transform_parameter(result, message, target)
|
|
|
|
|
out = out.replace("{" + result + "}", param)
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def eval_cc(cmd, message: discord.Message, ctx: commands.Context):
|
|
|
|
|
"""Does all the work"""
|
|
|
|
|
if cmd["proles"] and not (set(role.id for role in message.author.roles) & set(cmd["proles"])):
|
|
|
|
|
log.debug(f"{message.author} missing required role to execute {ctx.invoked_with}")
|
|
|
|
|
return # Not authorized, do nothing
|
|
|
|
|
|
|
|
|
|
if cmd["targeted"]:
|
|
|
|
|
view: StringView = ctx.view
|
|
|
|
|
view.skip_ws()
|
|
|
|
|
|
|
|
|
|
# guild: discord.Guild = ctx.guild
|
|
|
|
|
# print(f"Guild: {guild}")
|
|
|
|
|
|
|
|
|
|
target = view.get_quoted_word()
|
|
|
|
|
# print(f"Target: {target}")
|
|
|
|
|
|
|
|
|
|
if target:
|
|
|
|
|
# target = discord.utils.get(guild.members, mention=target)
|
|
|
|
|
try:
|
|
|
|
|
target = await commands.MemberConverter().convert(ctx, target)
|
|
|
|
|
except commands.BadArgument:
|
|
|
|
|
target = None
|
|
|
|
|
else:
|
|
|
|
|
target = None
|
|
|
|
|
|
|
|
|
|
if not target:
|
|
|
|
|
out_message = (
|
|
|
|
|
f"This custom command is targeted! @mention a target\n`"
|
|
|
|
|
f"{ctx.invoked_with} <target>`"
|
|
|
|
|
)
|
|
|
|
|
await ctx.send(out_message)
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
target = message.author
|
|
|
|
|
|
|
|
|
|
reason = get_audit_reason(message.author)
|
|
|
|
|
|
|
|
|
|
if cmd["aroles"]:
|
|
|
|
|
arole_list = [
|
|
|
|
|
discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["aroles"]
|
|
|
|
|
]
|
|
|
|
|
try:
|
|
|
|
|
await target.add_roles(*arole_list, reason=reason)
|
|
|
|
|
except discord.Forbidden:
|
|
|
|
|
log.exception(f"Permission error: Unable to add roles")
|
|
|
|
|
await ctx.send("Permission error: Unable to add roles")
|
|
|
|
|
|
|
|
|
|
if cmd["rroles"]:
|
|
|
|
|
rrole_list = [
|
|
|
|
|
discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["rroles"]
|
|
|
|
|
]
|
|
|
|
|
try:
|
|
|
|
|
await target.remove_roles(*rrole_list, reason=reason)
|
|
|
|
|
except discord.Forbidden:
|
|
|
|
|
log.exception(f"Permission error: Unable to remove roles")
|
|
|
|
|
await ctx.send("Permission error: Unable to remove roles")
|
|
|
|
|
|
|
|
|
|
if cmd["text"] is not None:
|
|
|
|
|
out_message = format_cc(cmd, message, target)
|
|
|
|
|
await ctx.send(out_message, allowed_mentions=discord.AllowedMentions())
|
|
|
|
|
else:
|
|
|
|
|
await ctx.tick()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CCRole(commands.Cog):
|
|
|
|
@ -42,6 +143,52 @@ class CCRole(commands.Cog):
|
|
|
|
|
"""Nothing to delete"""
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
async def _query_for_roles(self, ctx, message, timeout, check):
|
|
|
|
|
m: discord.Message = await ctx.send(message)
|
|
|
|
|
try:
|
|
|
|
|
await m.add_reaction("\N{WHITE HEAVY CHECK MARK}")
|
|
|
|
|
except discord.HTTPException:
|
|
|
|
|
log.exception("Unable to add reaction to ccrole setup message")
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
def reaction_check(payload):
|
|
|
|
|
return payload.message_id == m.id and payload.user_id == ctx.author.id
|
|
|
|
|
|
|
|
|
|
tasks = [
|
|
|
|
|
asyncio.ensure_future(self.bot.wait_for("message", timeout=timeout, check=check)),
|
|
|
|
|
asyncio.ensure_future(
|
|
|
|
|
self.bot.wait_for("raw_reaction_add", timeout=timeout, check=reaction_check)
|
|
|
|
|
),
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
done, pending = await asyncio.wait(
|
|
|
|
|
tasks, timeout=timeout, return_when=asyncio.FIRST_COMPLETED
|
|
|
|
|
)
|
|
|
|
|
for task in pending:
|
|
|
|
|
task.cancel()
|
|
|
|
|
|
|
|
|
|
if len(done) == 0:
|
|
|
|
|
return {'success': False, 'message': "Timed out, canceling"}
|
|
|
|
|
|
|
|
|
|
payload_or_message = done.pop().result()
|
|
|
|
|
|
|
|
|
|
# try:
|
|
|
|
|
# answer = await self.bot.wait_for("message", timeout=timeout, check=check)
|
|
|
|
|
# except asyncio.TimeoutError:
|
|
|
|
|
# return None, "Timed out, canceling"
|
|
|
|
|
|
|
|
|
|
role_list = []
|
|
|
|
|
if (
|
|
|
|
|
isinstance(payload_or_message, discord.Message)
|
|
|
|
|
and payload_or_message.content.upper() != "NONE"
|
|
|
|
|
):
|
|
|
|
|
role_list = await get_roles_from_content(ctx, payload_or_message.content)
|
|
|
|
|
if role_list is None:
|
|
|
|
|
return {'success': False, 'message': "Invalid answer, canceling"}
|
|
|
|
|
|
|
|
|
|
# Either it was a reaction or None
|
|
|
|
|
return {'success': True, 'roles': role_list, 'message': "Success"}
|
|
|
|
|
|
|
|
|
|
@commands.guild_only()
|
|
|
|
|
@commands.group()
|
|
|
|
|
async def ccrole(self, ctx: commands.Context):
|
|
|
|
@ -70,8 +217,6 @@ class CCRole(commands.Cog):
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
guild = ctx.guild
|
|
|
|
|
author = ctx.author
|
|
|
|
|
channel = ctx.channel
|
|
|
|
|
|
|
|
|
|
cmd_list = self.config.guild(guild).cmdlist
|
|
|
|
|
|
|
|
|
@ -83,64 +228,47 @@ class CCRole(commands.Cog):
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
def check(m):
|
|
|
|
|
return m.author == ctx.author and m.channel == ctx.channel
|
|
|
|
|
|
|
|
|
|
# Roles to add
|
|
|
|
|
await ctx.send(
|
|
|
|
|
results = await self._query_for_roles(
|
|
|
|
|
ctx,
|
|
|
|
|
"What roles should it add? (Must be **comma separated**)\n"
|
|
|
|
|
"Say `None` to skip adding roles"
|
|
|
|
|
"Say `None` or react with tick to skip adding roles",
|
|
|
|
|
120,
|
|
|
|
|
check,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
def check(m):
|
|
|
|
|
return m.author == author and m.channel == channel
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
answer = await self.bot.wait_for("message", timeout=120, check=check)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
await ctx.send("Timed out, canceling")
|
|
|
|
|
if not results['success']:
|
|
|
|
|
await ctx.send(results['message'])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
arole_list = []
|
|
|
|
|
if answer.content.upper() != "NONE":
|
|
|
|
|
arole_list = await _get_roles_from_content(ctx, answer.content)
|
|
|
|
|
if arole_list is None:
|
|
|
|
|
await ctx.send("Invalid answer, canceling")
|
|
|
|
|
return
|
|
|
|
|
arole_list = results['roles']
|
|
|
|
|
|
|
|
|
|
# Roles to remove
|
|
|
|
|
await ctx.send(
|
|
|
|
|
results = await self._query_for_roles(
|
|
|
|
|
ctx,
|
|
|
|
|
"What roles should it remove? (Must be comma separated)\n"
|
|
|
|
|
"Say `None` to skip removing roles"
|
|
|
|
|
"Say `None` or react with tick to skip removing roles",
|
|
|
|
|
120,
|
|
|
|
|
check,
|
|
|
|
|
)
|
|
|
|
|
try:
|
|
|
|
|
answer = await self.bot.wait_for("message", timeout=120, check=check)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
await ctx.send("Timed out, canceling")
|
|
|
|
|
if not results['success']:
|
|
|
|
|
await ctx.send(results['message'])
|
|
|
|
|
return
|
|
|
|
|
rrole_list = results['roles']
|
|
|
|
|
|
|
|
|
|
rrole_list = []
|
|
|
|
|
if answer.content.upper() != "NONE":
|
|
|
|
|
rrole_list = await _get_roles_from_content(ctx, answer.content)
|
|
|
|
|
if rrole_list is None:
|
|
|
|
|
await ctx.send("Invalid answer, canceling")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
# Roles to use
|
|
|
|
|
await ctx.send(
|
|
|
|
|
# Roles to allow use
|
|
|
|
|
results = await self._query_for_roles(
|
|
|
|
|
ctx,
|
|
|
|
|
"What roles are allowed to use this command? (Must be comma separated)\n"
|
|
|
|
|
"Say `None` to allow all roles"
|
|
|
|
|
"Say `None` or react with tick to allow all roles",
|
|
|
|
|
120,
|
|
|
|
|
check,
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
answer = await self.bot.wait_for("message", timeout=120, check=check)
|
|
|
|
|
except asyncio.TimeoutError:
|
|
|
|
|
await ctx.send("Timed out, canceling")
|
|
|
|
|
if not results['success']:
|
|
|
|
|
await ctx.send(results['message'])
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
prole_list = []
|
|
|
|
|
if answer.content.upper() != "NONE":
|
|
|
|
|
prole_list = await _get_roles_from_content(ctx, answer.content)
|
|
|
|
|
if prole_list is None:
|
|
|
|
|
await ctx.send("Invalid answer, canceling")
|
|
|
|
|
return
|
|
|
|
|
prole_list = results['roles']
|
|
|
|
|
|
|
|
|
|
# Selfrole
|
|
|
|
|
await ctx.send(
|
|
|
|
@ -153,12 +281,11 @@ class CCRole(commands.Cog):
|
|
|
|
|
await ctx.send("Timed out, canceling")
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if answer.content.upper() in ["Y", "YES"]:
|
|
|
|
|
targeted = True
|
|
|
|
|
await ctx.send("This command will be **`targeted`**")
|
|
|
|
|
else:
|
|
|
|
|
targeted = False
|
|
|
|
|
await ctx.send("This command will be **`selfrole`**")
|
|
|
|
|
targeted = answer.content.upper() in ["Y", "YES"]
|
|
|
|
|
|
|
|
|
|
await ctx.send(
|
|
|
|
|
"This command will be **`{}`**".format("targeted" if targeted else "selfrole")
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Message to send
|
|
|
|
|
await ctx.send(
|
|
|
|
@ -246,9 +373,8 @@ class CCRole(commands.Cog):
|
|
|
|
|
cmd_list = {k: v for k, v in cmd_list.items() if v}
|
|
|
|
|
if not cmd_list:
|
|
|
|
|
await ctx.send(
|
|
|
|
|
"There are no custom commands in this server. Use `{}ccrole add` to start adding some.".format(
|
|
|
|
|
ctx.prefix
|
|
|
|
|
)
|
|
|
|
|
f"There are no custom commands in this server.\n"
|
|
|
|
|
f"Use `{ctx.prefix}ccrole add` to start adding some."
|
|
|
|
|
)
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
@ -294,14 +420,16 @@ class CCRole(commands.Cog):
|
|
|
|
|
cmd = ctx.invoked_with
|
|
|
|
|
cmd = cmd.lower() # Continues the proud case_insentivity tradition of ccrole
|
|
|
|
|
guild = ctx.guild
|
|
|
|
|
# message = ctx.message # Unneeded since switch to `on_message_without_command` from `on_command_error`
|
|
|
|
|
|
|
|
|
|
# Unneeded since switch to `on_message_without_command` from `on_command_error`
|
|
|
|
|
# message = ctx.message
|
|
|
|
|
|
|
|
|
|
cmdlist = self.config.guild(guild).cmdlist
|
|
|
|
|
# cmd = message.content[len(prefix) :].split()[0].lower()
|
|
|
|
|
cmd = await cmdlist.get_raw(cmd, default=None)
|
|
|
|
|
|
|
|
|
|
if cmd is not None:
|
|
|
|
|
await self.eval_cc(cmd, message, ctx)
|
|
|
|
|
await eval_cc(cmd, message, ctx)
|
|
|
|
|
else:
|
|
|
|
|
log.debug(f"No custom command named {ctx.invoked_with} found")
|
|
|
|
|
|
|
|
|
@ -322,103 +450,3 @@ class CCRole(commands.Cog):
|
|
|
|
|
if content.startswith(p):
|
|
|
|
|
return p
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
|
|
async def eval_cc(self, cmd, message: discord.Message, ctx: commands.Context):
|
|
|
|
|
"""Does all the work"""
|
|
|
|
|
if cmd["proles"] and not (
|
|
|
|
|
set(role.id for role in message.author.roles) & set(cmd["proles"])
|
|
|
|
|
):
|
|
|
|
|
log.debug(f"{message.author} missing required role to execute {ctx.invoked_with}")
|
|
|
|
|
return # Not authorized, do nothing
|
|
|
|
|
|
|
|
|
|
if cmd["targeted"]:
|
|
|
|
|
view: StringView = ctx.view
|
|
|
|
|
view.skip_ws()
|
|
|
|
|
|
|
|
|
|
guild: discord.Guild = ctx.guild
|
|
|
|
|
# print(f"Guild: {guild}")
|
|
|
|
|
|
|
|
|
|
target = view.get_quoted_word()
|
|
|
|
|
# print(f"Target: {target}")
|
|
|
|
|
|
|
|
|
|
if target:
|
|
|
|
|
# target = discord.utils.get(guild.members, mention=target)
|
|
|
|
|
try:
|
|
|
|
|
target = await commands.MemberConverter().convert(ctx, target)
|
|
|
|
|
except commands.BadArgument:
|
|
|
|
|
target = None
|
|
|
|
|
else:
|
|
|
|
|
target = None
|
|
|
|
|
|
|
|
|
|
if not target:
|
|
|
|
|
out_message = (
|
|
|
|
|
f"This custom command is targeted! @mention a target\n`"
|
|
|
|
|
f"{ctx.invoked_with} <target>`"
|
|
|
|
|
)
|
|
|
|
|
await ctx.send(out_message)
|
|
|
|
|
return
|
|
|
|
|
else:
|
|
|
|
|
target = message.author
|
|
|
|
|
|
|
|
|
|
reason = get_audit_reason(message.author)
|
|
|
|
|
|
|
|
|
|
if cmd["aroles"]:
|
|
|
|
|
arole_list = [
|
|
|
|
|
discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["aroles"]
|
|
|
|
|
]
|
|
|
|
|
try:
|
|
|
|
|
await target.add_roles(*arole_list, reason=reason)
|
|
|
|
|
except discord.Forbidden:
|
|
|
|
|
log.exception(f"Permission error: Unable to add roles")
|
|
|
|
|
await ctx.send("Permission error: Unable to add roles")
|
|
|
|
|
|
|
|
|
|
if cmd["rroles"]:
|
|
|
|
|
rrole_list = [
|
|
|
|
|
discord.utils.get(message.guild.roles, id=roleid) for roleid in cmd["rroles"]
|
|
|
|
|
]
|
|
|
|
|
try:
|
|
|
|
|
await target.remove_roles(*rrole_list, reason=reason)
|
|
|
|
|
except discord.Forbidden:
|
|
|
|
|
log.exception(f"Permission error: Unable to remove roles")
|
|
|
|
|
await ctx.send("Permission error: Unable to remove roles")
|
|
|
|
|
|
|
|
|
|
if cmd["text"] is not None:
|
|
|
|
|
out_message = self.format_cc(cmd, message, target)
|
|
|
|
|
await ctx.send(out_message, allowed_mentions=discord.AllowedMentions())
|
|
|
|
|
else:
|
|
|
|
|
await ctx.tick()
|
|
|
|
|
|
|
|
|
|
def format_cc(self, cmd, message, target):
|
|
|
|
|
out = cmd["text"]
|
|
|
|
|
results = re.findall("{([^}]+)\}", out)
|
|
|
|
|
for result in results:
|
|
|
|
|
param = self.transform_parameter(result, message, target)
|
|
|
|
|
out = out.replace("{" + result + "}", param)
|
|
|
|
|
return out
|
|
|
|
|
|
|
|
|
|
def transform_parameter(self, result, message, target):
|
|
|
|
|
"""
|
|
|
|
|
For security reasons only specific objects are allowed
|
|
|
|
|
Internals are ignored
|
|
|
|
|
Copied from customcom.CustomCommands.transform_parameter and added `target`
|
|
|
|
|
"""
|
|
|
|
|
raw_result = "{" + result + "}"
|
|
|
|
|
objects = {
|
|
|
|
|
"message": message,
|
|
|
|
|
"author": message.author,
|
|
|
|
|
"channel": message.channel,
|
|
|
|
|
"server": message.guild,
|
|
|
|
|
"guild": message.guild,
|
|
|
|
|
"target": target,
|
|
|
|
|
}
|
|
|
|
|
if result in objects:
|
|
|
|
|
return str(objects[result])
|
|
|
|
|
try:
|
|
|
|
|
first, second = result.split(".")
|
|
|
|
|
except ValueError:
|
|
|
|
|
return raw_result
|
|
|
|
|
if first in objects and not second.startswith("_"):
|
|
|
|
|
first = objects[first]
|
|
|
|
|
else:
|
|
|
|
|
return raw_result
|
|
|
|
|
return str(getattr(first, second, raw_result))
|
|
|
|
|