Merge pull request #132 from bobloy/fifo_initial

Release of FIFO scheduler cog
pull/134/head
bobloy 4 years ago committed by GitHub
commit cc95290290
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -12,6 +12,7 @@ Cog Function
| conquest | **Alpha** | <details><summary>Manage maps for war games and RPGs</summary>Lots of additional features are planned, currently function with simple map</details> |
| dad | **Beta** | <details><summary>Tell dad jokes</summary>Works great!</details> |
| exclusiverole | **Alpha** | <details><summary>Prevent certain roles from getting any other roles</summary>Fully functional, but pretty simple</details> |
| fifo | **Alpha** | <details><summary>Schedule commands to be run at certain times or intervals</summary>Just released, please report bugs as you find them. Only works for bot owner for now</details> |
| fight | **Incomplete** | <details><summary>Organize bracket tournaments within discord</summary>Still in-progress, a massive project</details> |
| flag | **Alpha** | <details><summary>Create temporary marks on users that expire after specified time</summary>Ported, will not import old data. Please report bugs</details> |
| forcemention | **Alpha** | <details><summary>Mentions unmentionable roles</summary>Very simple cog, mention doesn't persist</details> |

@ -0,0 +1,11 @@
from .fifo import FIFO
async def setup(bot):
cog = FIFO(bot)
bot.add_cog(cog)
await cog.initialize()
def teardown(bot):
pass

@ -0,0 +1,29 @@
from datetime import datetime
from typing import TYPE_CHECKING
from apscheduler.triggers.cron import CronTrigger
from dateutil import parser
from discord.ext.commands import BadArgument, Converter
from fifo.timezones import assemble_timezones
if TYPE_CHECKING:
DatetimeConverter = datetime
CronConverter = str
else:
class DatetimeConverter(Converter):
async def convert(self, ctx, argument) -> datetime:
dt = parser.parse(argument, fuzzy=True, tzinfos=assemble_timezones())
if dt is not None:
return dt
raise BadArgument()
class CronConverter(Converter):
async def convert(self, ctx, argument) -> str:
try:
CronTrigger.from_crontab(argument)
except ValueError:
raise BadArgument()
return argument

@ -0,0 +1,476 @@
import logging
from datetime import datetime, timedelta
from typing import Optional, Union
import discord
from apscheduler.job import Job
from apscheduler.jobstores.base import JobLookupError
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING
from redbot.core import Config, checks, commands
from redbot.core.bot import Red
from redbot.core.commands import TimedeltaConverter
from .datetime_cron_converters import CronConverter, DatetimeConverter
from .task import Task
schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler")
schedule_log.setLevel(logging.DEBUG)
log = logging.getLogger("red.fox_v3.fifo")
async def _execute_task(task_state):
log.info(f"Executing {task_state=}")
task = Task(**task_state)
if await task.load_from_config():
return await task.execute()
return False
def _assemble_job_id(task_name, guild_id):
return f"{task_name}_{guild_id}"
def _disassemble_job_id(job_id: str):
return job_id.split("_")
class FIFO(commands.Cog):
"""
Simple Scheduling Cog
Named after the simplest scheduling algorithm: First In First Out
"""
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
self.config = Config.get_conf(self, identifier=70737079, force_registration=True)
default_global = {"jobs": []}
default_guild = {"tasks": {}}
self.config.register_global(**default_global)
self.config.register_guild(**default_guild)
self.scheduler = None
self.jobstore = None
async def red_delete_data_for_user(self, **kwargs):
"""Nothing to delete"""
return
def cog_unload(self):
# self.scheduler.remove_all_jobs()
if self.scheduler is not None:
self.scheduler.shutdown()
async def initialize(self):
job_defaults = {"coalesce": False, "max_instances": 1}
# executors = {"default": AsyncIOExecutor()}
# Default executor is already AsyncIOExecutor
self.scheduler = AsyncIOScheduler(job_defaults=job_defaults, logger=schedule_log)
from .redconfigjobstore import RedConfigJobStore
self.jobstore = RedConfigJobStore(self.config, self.bot)
await self.jobstore.load_from_config(self.scheduler, "default")
self.scheduler.add_jobstore(self.jobstore, "default")
self.scheduler.start()
async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str):
message: discord.Message = ctx.message
message.content = ctx.prefix + command_to_parse
message.author = ctx.author
new_ctx: commands.Context = await self.bot.get_context(message)
return new_ctx.valid
async def _delete_task(self, task: Task):
job: Union[Job, None] = await self._get_job(task)
if job is not None:
job.remove()
await task.delete_self()
async def _process_task(self, task: Task):
job: Union[Job, None] = await self._get_job(task)
if job is not None:
job.reschedule(await task.get_combined_trigger())
return job
return await self._add_job(task)
async def _get_job(self, task: Task) -> Job:
return self.scheduler.get_job(_assemble_job_id(task.name, task.guild_id))
async def _add_job(self, task: Task):
return self.scheduler.add_job(
_execute_task,
args=[task.__getstate__()],
id=_assemble_job_id(task.name, task.guild_id),
trigger=await task.get_combined_trigger(),
)
async def _resume_job(self, task: Task):
try:
job = self.scheduler.resume_job(job_id=_assemble_job_id(task.name, task.guild_id))
except JobLookupError:
job = await self._process_task(task)
return job
async def _pause_job(self, task: Task):
return self.scheduler.pause_job(job_id=_assemble_job_id(task.name, task.guild_id))
async def _remove_job(self, task: Task):
return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id))
@checks.is_owner()
@commands.guild_only()
@commands.command()
async def fifoclear(self, ctx: commands.Context):
"""Debug command to clear all current fifo data"""
self.scheduler.remove_all_jobs()
await self.config.guild(ctx.guild).tasks.clear()
await self.config.jobs.clear()
await self.config.jobs_index.clear()
await ctx.tick()
@checks.is_owner() # Will be reduced when I figure out permissions later
@commands.guild_only()
@commands.group()
async def fifo(self, ctx: commands.Context):
"""
Base command for handling scheduling of tasks
"""
if ctx.invoked_subcommand is None:
pass
@fifo.command(name="set")
async def fifo_set(
self,
ctx: commands.Context,
task_name: str,
author_or_channel: Union[discord.Member, discord.TextChannel],
):
"""
Sets a different author or in a different channel for execution of a task.
"""
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
if isinstance(author_or_channel, discord.Member):
if task.author_id == author_or_channel.id:
await ctx.maybe_send_embed("Already executing as that member")
return
await task.set_author(author_or_channel) # also saves
elif isinstance(author_or_channel, discord.TextChannel):
if task.channel_id == author_or_channel.id:
await ctx.maybe_send_embed("Already executing in that channel")
return
await task.set_channel(author_or_channel)
else:
await ctx.maybe_send_embed("Unsupported result")
return
await ctx.tick()
@fifo.command(name="resume")
async def fifo_resume(self, ctx: commands.Context, task_name: Optional[str] = None):
"""
Provide a task name to resume execution of a task.
Otherwise resumes execution of all tasks on all guilds
If the task isn't currently scheduled, will schedule it
"""
if task_name is None:
if self.scheduler.state == STATE_PAUSED:
self.scheduler.resume()
await ctx.maybe_send_embed("All task execution for all guilds has been resumed")
else:
await ctx.maybe_send_embed("Task execution is not paused, can't resume")
else:
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
if await self._resume_job(task):
await ctx.maybe_send_embed(f"Execution of {task_name=} has been resumed")
else:
await ctx.maybe_send_embed(f"Failed to resume {task_name=}")
@fifo.command(name="pause")
async def fifo_pause(self, ctx: commands.Context, task_name: Optional[str] = None):
"""
Provide a task name to pause execution of a task
Otherwise pauses execution of all tasks on all guilds
"""
if task_name is None:
if self.scheduler.state == STATE_RUNNING:
self.scheduler.pause()
await ctx.maybe_send_embed("All task execution for all guilds has been paused")
else:
await ctx.maybe_send_embed("Task execution is not running, can't pause")
else:
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
if await self._pause_job(task):
await ctx.maybe_send_embed(f"Execution of {task_name=} has been paused")
else:
await ctx.maybe_send_embed(f"Failed to pause {task_name=}")
@fifo.command(name="details")
async def fifo_details(self, ctx: commands.Context, task_name: str):
"""
Provide all the details on the specified task name
"""
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
embed = discord.Embed(title=f"Task: {task_name}")
embed.add_field(
name="Task command", value=f"{ctx.prefix}{task.get_command_str()}", inline=False
)
guild: discord.Guild = self.bot.get_guild(task.guild_id)
if guild is not None:
author: discord.Member = guild.get_member(task.author_id)
channel: discord.TextChannel = guild.get_channel(task.channel_id)
embed.add_field(name="Server", value=guild.name)
if author is not None:
embed.add_field(name="Author", value=author.mention)
if channel is not None:
embed.add_field(name="Channel", value=channel.mention)
else:
embed.add_field(name="Server", value="Server not found", inline=False)
trigger_str = "\n".join(str(t) for t in await task.get_triggers())
if trigger_str:
embed.add_field(name="Triggers", value=trigger_str, inline=False)
job = await self._get_job(task)
if job and job.next_run_time:
embed.timestamp = job.next_run_time
await ctx.send(embed=embed)
@fifo.command(name="list")
async def fifo_list(self, ctx: commands.Context, all_guilds: bool = False):
"""
Lists all current tasks and their triggers.
Do `[p]fifo list True` to see tasks from all guilds
"""
if all_guilds:
pass
else:
out = ""
all_tasks = await self.config.guild(ctx.guild).tasks()
for task_name, task_data in all_tasks.items():
out += f"{task_name}: {task_data}\n"
if out:
await ctx.maybe_send_embed(out)
else:
await ctx.maybe_send_embed("No tasks to list")
@fifo.command(name="add")
async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str):
"""
Add a new task to this guild's task list
"""
if (await self.config.guild(ctx.guild).tasks.get_raw(task_name, default=None)) is not None:
await ctx.maybe_send_embed(f"Task already exists with {task_name=}")
return
if "_" in task_name: # See _disassemble_job_id
await ctx.maybe_send_embed("Task name cannot contain underscores")
return
if not await self._check_parsable_command(ctx, command_to_execute):
await ctx.maybe_send_embed(
"Failed to parse command. Make sure not to include the prefix"
)
return
task = Task(task_name, ctx.guild.id, self.config, ctx.author.id, ctx.channel.id, self.bot)
await task.set_commmand_str(command_to_execute)
await task.save_all()
await ctx.tick()
@fifo.command(name="delete")
async def fifo_delete(self, ctx: commands.Context, task_name: str):
"""
Deletes a task from this guild's task list
"""
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
await self._delete_task(task)
await ctx.maybe_send_embed(f"Task[{task_name}] has been deleted from this guild")
@fifo.command(name="cleartriggers", aliases=["cleartrigger"])
async def fifo_cleartriggers(self, ctx: commands.Context, task_name: str):
"""
Removes all triggers from specified task
Useful to start over with new trigger
"""
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
await task.clear_triggers()
await ctx.tick()
@fifo.group(name="addtrigger", aliases=["trigger"])
async def fifo_trigger(self, ctx: commands.Context):
"""
Add a new trigger for a task from the current guild.
"""
if ctx.invoked_subcommand is None:
pass
@fifo_trigger.command(name="interval")
async def fifo_trigger_interval(
self, ctx: commands.Context, task_name: str, *, interval_str: TimedeltaConverter
):
"""
Add an interval trigger to the specified task
"""
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
result = await task.add_trigger("interval", interval_str)
if not result:
await ctx.maybe_send_embed(
"Failed to add an interval trigger to this task, see console for logs"
)
return
await task.save_data()
job: Job = await self._process_task(task)
delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo)
await ctx.maybe_send_embed(
f"Task `{task_name}` added interval of {interval_str} to its scheduled runtimes\n"
f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)"
)
@fifo_trigger.command(name="date")
async def fifo_trigger_date(
self, ctx: commands.Context, task_name: str, *, datetime_str: DatetimeConverter
):
"""
Add a "run once" datetime trigger to the specified task
"""
task = Task(task_name, ctx.guild.id, self.config)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
result = await task.add_trigger("date", datetime_str)
if not result:
await ctx.maybe_send_embed(
"Failed to add a date trigger to this task, see console for logs"
)
return
await task.save_data()
job: Job = await self._process_task(task)
delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo)
await ctx.maybe_send_embed(
f"Task `{task_name}` added {datetime_str} to its scheduled runtimes\n"
f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)"
)
@fifo_trigger.command(name="cron")
async def fifo_trigger_cron(
self, ctx: commands.Context, task_name: str, *, cron_str: CronConverter
):
"""
Add a cron "time of day" trigger to the specified task
See https://crontab.guru/ for help generating the cron_str
"""
task = Task(task_name, ctx.guild.id, self.config)
await task.load_from_config()
if task.data is None:
await ctx.maybe_send_embed(
f"Task by the name of {task_name} is not found in this guild"
)
return
result = await task.add_trigger("cron", cron_str)
if not result:
await ctx.maybe_send_embed(
"Failed to add a cron trigger to this task, see console for logs"
)
return
await task.save_data()
job: Job = await self._process_task(task)
delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo)
await ctx.maybe_send_embed(
f"Task `{task_name}` added cron_str to its scheduled runtimes\n"
f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)"
)

@ -0,0 +1,28 @@
{
"author": [
"Bobloy"
],
"min_bot_version": "3.3.0",
"description": "[ALPHA] Schedule commands to be run at certain times or intervals",
"hidden": false,
"install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`",
"short": "[ALPHA] Schedule commands to be run at certain times or intervals\"",
"end_user_data_statement": "This cog does not store any End User Data",
"requirements": [
"apscheduler",
"dateutil"
],
"tags": [
"bobloy",
"utilities",
"tool",
"roles",
"schedule",
"cron",
"interval",
"date",
"datetime",
"time",
"calendar"
]
}

@ -0,0 +1,183 @@
import asyncio
import base64
import logging
import pickle
from datetime import datetime
from typing import Tuple, Union
from apscheduler.job import Job
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.asyncio import run_in_event_loop
from apscheduler.util import datetime_to_utc_timestamp
from redbot.core import Config
# TODO: use get_lock on config
from redbot.core.bot import Red
from redbot.core.utils import AsyncIter
log = logging.getLogger("red.fox_v3.fifo.jobstore")
log.setLevel(logging.DEBUG)
save_task_objects = []
class RedConfigJobStore(MemoryJobStore):
def __init__(self, config: Config, bot: Red):
super().__init__()
self.config = config
self.bot = bot
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
self._eventloop = self.bot.loop
# TODO: self.config.jobs_index is never used,
# fine but maybe a sign of inefficient use of config
# task = asyncio.create_task(self.load_from_config())
# while not task.done():
# sleep(0.1)
# future = asyncio.ensure_future(self.load_from_config(), loop=self.bot.loop)
@run_in_event_loop
def start(self, scheduler, alias):
super().start(scheduler, alias)
async def load_from_config(self, scheduler, alias):
super().start(scheduler, alias)
_jobs = await self.config.jobs()
self._jobs = [
(await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
]
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
def _encode_job(self, job: Job):
job_state = job.__getstate__()
new_args = list(job_state["args"])
new_args[0]["config"] = None
new_args[0]["bot"] = None
job_state["args"] = tuple(new_args)
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
out = {
"_id": job.id,
"next_run_time": datetime_to_utc_timestamp(job.next_run_time),
"job_state": encoded.decode("ascii"),
}
new_args = list(job_state["args"])
new_args[0]["config"] = self.config
new_args[0]["bot"] = self.bot
job_state["args"] = tuple(new_args)
# log.debug(f"Encoding job id: {job.id}\n"
# f"Encoded as: {out}")
return out
async def _decode_job(self, in_job):
if in_job is None:
return None
job_state = in_job["job_state"]
job_state = pickle.loads(base64.b64decode(job_state))
new_args = list(job_state["args"])
new_args[0]["config"] = self.config
new_args[0]["bot"] = self.bot
job_state["args"] = tuple(new_args)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
# task_name, guild_id = _disassemble_job_id(job.id)
# task = Task(task_name, guild_id, self.config)
# await task.load_from_config()
# save_task_objects.append(task)
#
# job.func = task.execute
# log.debug(f"Decoded job id: {job.id}\n"
# f"Decoded as {job_state}")
return job
@run_in_event_loop
def add_job(self, job: Job):
if job.id in self._jobs_index:
raise ConflictingIdError(job.id)
# log.debug(f"Check job args: {job.args=}")
timestamp = datetime_to_utc_timestamp(job.next_run_time)
index = self._get_job_index(timestamp, job.id) # This is fine
self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp)
asyncio.create_task(self._async_add_job(job, index, timestamp))
# log.debug(f"Added job: {self._jobs[index][0].args}")
async def _async_add_job(self, job, index, timestamp):
encoded_job = self._encode_job(job)
job_tuple = tuple([encoded_job, timestamp])
async with self.config.jobs() as jobs:
jobs.insert(index, job_tuple)
# await self.config.jobs_index.set_raw(job.id, value=job_tuple)
return True
@run_in_event_loop
def update_job(self, job):
old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
job.id, (None, None)
)
old_job = old_tuple[0]
old_timestamp = old_tuple[1]
if old_job is None:
raise JobLookupError(job.id)
# If the next run time has not changed, simply replace the job in its present index.
# Otherwise, reinsert the job to the list to preserve the ordering.
old_index = self._get_job_index(old_timestamp, old_job.id)
new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
asyncio.create_task(
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
)
async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
encoded_job = self._encode_job(job)
if old_timestamp == new_timestamp:
self._jobs[old_index] = (job, new_timestamp)
async with self.config.jobs() as jobs:
jobs[old_index] = (encoded_job, new_timestamp)
else:
del self._jobs[old_index]
new_index = self._get_job_index(new_timestamp, job.id) # This is fine
self._jobs.insert(new_index, (job, new_timestamp))
async with self.config.jobs() as jobs:
del jobs[old_index]
jobs.insert(new_index, (encoded_job, new_timestamp))
self._jobs_index[old_job.id] = (job, new_timestamp)
# await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
log.debug(f"Async Updated {job.id=}")
log.debug(f"Check job args: {job.args=}")
@run_in_event_loop
def remove_job(self, job_id):
job, timestamp = self._jobs_index.get(job_id, (None, None))
if job is None:
raise JobLookupError(job_id)
index = self._get_job_index(timestamp, job_id)
del self._jobs[index]
del self._jobs_index[job.id]
asyncio.create_task(self._async_remove_job(index, job))
async def _async_remove_job(self, index, job):
async with self.config.jobs() as jobs:
del jobs[index]
# await self.config.jobs_index.clear_raw(job.id)
@run_in_event_loop
def remove_all_jobs(self):
super().remove_all_jobs()
asyncio.create_task(self._async_remove_all_jobs())
async def _async_remove_all_jobs(self):
await self.config.jobs.clear()
# await self.config.jobs_index.clear()
def shutdown(self):
"""Removes all jobs without clearing config"""
super().remove_all_jobs()

@ -0,0 +1,337 @@
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Union
import discord
from apscheduler.triggers.base import BaseTrigger
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from discord.utils import time_snowflake
from redbot.core import Config, commands
from redbot.core.bot import Red
log = logging.getLogger("red.fox_v3.fifo.task")
async def _do_nothing(*args, **kwargs):
pass
def get_trigger(data):
if data["type"] == "interval":
parsed_time = data["time_data"]
return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds)
if data["type"] == "date":
return DateTrigger(data["time_data"])
if data["type"] == "cron":
return CronTrigger.from_crontab(data["time_data"])
return False
def parse_triggers(data: Union[Dict, None]):
if data is None or not data.get("triggers", False): # No triggers
return None
if len(data["triggers"]) > 1: # Multiple triggers
return OrTrigger(get_trigger(t_data) for t_data in data["triggers"])
return get_trigger(data["triggers"][0])
class FakeMessage:
def __init__(self, message: discord.Message):
d = {k: getattr(message, k, None) for k in dir(message)}
self.__dict__.update(**d)
def neuter_message(message: FakeMessage):
message.delete = _do_nothing
message.edit = _do_nothing
message.publish = _do_nothing
message.pin = _do_nothing
message.unpin = _do_nothing
message.add_reaction = _do_nothing
message.remove_reaction = _do_nothing
message.clear_reaction = _do_nothing
message.clear_reactions = _do_nothing
message.ack = _do_nothing
return message
class Task:
default_task_data = {"triggers": [], "command_str": ""}
default_trigger = {
"type": "",
"time_data": None, # Used for Interval and Date Triggers
}
def __init__(
self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None
):
self.name = name
self.guild_id = guild_id
self.config = config
self.bot = bot
self.author_id = author_id
self.channel_id = channel_id
self.data = None
async def _encode_time_triggers(self):
if not self.data or not self.data.get("triggers", None):
return []
triggers = []
for t in self.data["triggers"]:
if t["type"] == "interval": # Convert into timedelta
td: timedelta = t["time_data"]
triggers.append(
{"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}}
)
continue
if t["type"] == "date": # Convert into datetime
dt: datetime = t["time_data"]
triggers.append({"type": t["type"], "time_data": dt.isoformat()})
# triggers.append(
# {
# "type": t["type"],
# "time_data": {
# "year": dt.year,
# "month": dt.month,
# "day": dt.day,
# "hour": dt.hour,
# "minute": dt.minute,
# "second": dt.second,
# "tzinfo": dt.tzinfo,
# },
# }
# )
continue
if t["type"] == "cron":
triggers.append(t) # already a string, nothing to do
continue
raise NotImplemented
return triggers
async def _decode_time_triggers(self):
if not self.data or not self.data.get("triggers", None):
return
for n, t in enumerate(self.data["triggers"]):
if t["type"] == "interval": # Convert into timedelta
self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"])
continue
if t["type"] == "date": # Convert into datetime
# self.data["triggers"][n]["time_data"] = datetime(**t["time_data"])
self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"])
continue
if t["type"] == "cron":
continue # already a string
raise NotImplemented
# async def load_from_data(self, data: Dict):
# self.data = data.copy()
async def load_from_config(self):
data = await self.config.guild_from_id(self.guild_id).tasks.get_raw(
self.name, default=None
)
if not data:
return
self.author_id = data["author_id"]
self.guild_id = data["guild_id"]
self.channel_id = data["channel_id"]
self.data = data["data"]
await self._decode_time_triggers()
return self.data
async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]:
if not self.data:
await self.load_from_config()
if self.data is None or "triggers" not in self.data: # No triggers
return []
return [get_trigger(t) for t in self.data["triggers"]]
async def get_combined_trigger(self) -> Union[BaseTrigger, None]:
if not self.data:
await self.load_from_config()
return parse_triggers(self.data)
# async def set_job_id(self, job_id):
# if self.data is None:
# await self.load_from_config()
#
# self.data["job_id"] = job_id
async def save_all(self):
"""To be used when creating an new task"""
data_to_save = self.default_task_data.copy()
if self.data:
data_to_save["command_str"] = self.get_command_str()
data_to_save["triggers"] = await self._encode_time_triggers()
to_save = {
"guild_id": self.guild_id,
"author_id": self.author_id,
"channel_id": self.channel_id,
"data": data_to_save,
}
await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save)
async def save_data(self):
"""To be used when updating triggers"""
if not self.data:
return
data_to_save = self.data.copy()
data_to_save["triggers"] = await self._encode_time_triggers()
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
self.name, "data", value=data_to_save
)
async def execute(self):
if not self.data or not self.get_command_str():
log.warning(f"Could not execute task due to data problem: {self.data=}")
return False
guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix
if guild is None:
log.warning(f"Could not execute task due to missing guild: {self.guild_id}")
return False
channel: discord.TextChannel = guild.get_channel(self.channel_id)
if channel is None:
log.warning(f"Could not execute task due to missing channel: {self.channel_id}")
return False
author: discord.User = guild.get_member(self.author_id)
if author is None:
log.warning(f"Could not execute task due to missing author: {self.author_id}")
return False
actual_message: discord.Message = channel.last_message
# I'd like to present you my chain of increasingly desperate message fetching attempts
if actual_message is None:
# log.warning("No message found in channel cache yet, skipping execution")
# return
actual_message = await channel.fetch_message(channel.last_message_id)
if actual_message is None: # last_message_id was an invalid message I guess
actual_message = await channel.history(limit=1).flatten()
if not actual_message: # Basically only happens if the channel has no messages
actual_message = await author.history(limit=1).flatten()
if not actual_message: # Okay, the *author* has never sent a message?
log.warning("No message found in channel cache yet, skipping execution")
return
actual_message = actual_message[0]
message = FakeMessage(actual_message)
# message = FakeMessage2
message.author = author
message.guild = guild # Just in case we got desperate
message.channel = channel
message.id = time_snowflake(datetime.now()) # Pretend to be now
message = neuter_message(message)
# absolutely weird that this takes a message object instead of guild
prefixes = await self.bot.get_prefix(message)
if isinstance(prefixes, str):
prefix = prefixes
else:
prefix = prefixes[0]
message.content = f"{prefix}{self.get_command_str()}"
if not message.guild or not message.author or not message.content:
log.warning(f"Could not execute task due to message problem: {message}")
return False
new_ctx: commands.Context = await self.bot.get_context(message)
new_ctx.assume_yes = True
if not new_ctx.valid:
log.warning(
f"Could not execute Task[{self.name}] due invalid context: {new_ctx.invoked_with}"
)
return False
await self.bot.invoke(new_ctx)
return True
async def set_bot(self, bot: Red):
self.bot = bot
async def set_author(self, author: Union[discord.User, discord.Member, str]):
self.author_id = getattr(author, "id", None) or author
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
self.name, "author_id", value=self.author_id
)
async def set_channel(self, channel: Union[discord.TextChannel, str]):
self.channel_id = getattr(channel, "id", None) or channel
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
self.name, "channel_id", value=self.channel_id
)
def get_command_str(self):
return self.data.get("command_str", "")
async def set_commmand_str(self, command_str):
if not self.data:
self.data = self.default_task_data.copy()
self.data["command_str"] = command_str
return True
async def add_trigger(self, param, parsed_time: Union[timedelta, datetime, str]):
trigger_data = {"type": param, "time_data": parsed_time}
if not get_trigger(trigger_data):
return False
if not self.data:
self.data = self.default_task_data.copy()
self.data["triggers"].append(trigger_data)
return True
def __setstate__(self, task_state):
self.name = task_state["name"]
self.guild_id = task_state["guild_id"]
self.config = task_state["config"]
self.bot = None
self.author_id = None
self.channel_id = None
self.data = None
def __getstate__(self):
return {
"name": self.name,
"guild_id": self.guild_id,
"config": self.config,
"bot": self.bot,
}
async def clear_triggers(self):
self.data["triggers"] = []
await self.save_data()
async def delete_self(self):
"""Hopefully nothing uses the object after running this..."""
await self.config.guild_from_id(self.guild_id).tasks.clear_raw(self.name)

@ -0,0 +1,195 @@
"""
Timezone information for the dateutil parser
All credit to https://github.com/prefrontal/dateutil-parser-timezones
"""
from dateutil.tz import gettz
def assemble_timezones():
"""
Assembles a dictionary of timezone abbreviations and values
:return: Dictionary of abbreviation keys and timezone values
"""
timezones = {}
timezones['ACDT'] = gettz('Australia/Darwin') # Australian Central Daylight Savings Time (UTC+10:30)
timezones['ACST'] = gettz('Australia/Darwin') # Australian Central Standard Time (UTC+09:30)
timezones['ACT'] = gettz('Brazil/Acre') # Acre Time (UTC05)
timezones['ADT'] = gettz('America/Halifax') # Atlantic Daylight Time (UTC03)
timezones['AEDT'] = gettz('Australia/Sydney') # Australian Eastern Daylight Savings Time (UTC+11)
timezones['AEST'] = gettz('Australia/Sydney') # Australian Eastern Standard Time (UTC+10)
timezones['AFT'] = gettz('Asia/Kabul') # Afghanistan Time (UTC+04:30)
timezones['AKDT'] = gettz('America/Juneau') # Alaska Daylight Time (UTC08)
timezones['AKST'] = gettz('America/Juneau') # Alaska Standard Time (UTC09)
timezones['AMST'] = gettz('America/Manaus') # Amazon Summer Time (Brazil)[1] (UTC03)
timezones['AMT'] = gettz('America/Manaus') # Amazon Time (Brazil)[2] (UTC04)
timezones['ART'] = gettz('America/Cordoba') # Argentina Time (UTC03)
timezones['AST'] = gettz('Asia/Riyadh') # Arabia Standard Time (UTC+03)
timezones['AWST'] = gettz('Australia/Perth') # Australian Western Standard Time (UTC+08)
timezones['AZOST'] = gettz('Atlantic/Azores') # Azores Summer Time (UTC±00)
timezones['AZOT'] = gettz('Atlantic/Azores') # Azores Standard Time (UTC01)
timezones['AZT'] = gettz('Asia/Baku') # Azerbaijan Time (UTC+04)
timezones['BDT'] = gettz('Asia/Brunei') # Brunei Time (UTC+08)
timezones['BIOT'] = gettz('Etc/GMT+6') # British Indian Ocean Time (UTC+06)
timezones['BIT'] = gettz('Pacific/Funafuti') # Baker Island Time (UTC12)
timezones['BOT'] = gettz('America/La_Paz') # Bolivia Time (UTC04)
timezones['BRST'] = gettz('America/Sao_Paulo') # Brasilia Summer Time (UTC02)
timezones['BRT'] = gettz('America/Sao_Paulo') # Brasilia Time (UTC03)
timezones['BST'] = gettz('Asia/Dhaka') # Bangladesh Standard Time (UTC+06)
timezones['BTT'] = gettz('Asia/Thimphu') # Bhutan Time (UTC+06)
timezones['CAT'] = gettz('Africa/Harare') # Central Africa Time (UTC+02)
timezones['CCT'] = gettz('Indian/Cocos') # Cocos Islands Time (UTC+06:30)
timezones['CDT'] = gettz('America/Chicago') # Central Daylight Time (North America) (UTC05)
timezones['CEST'] = gettz('Europe/Berlin') # Central European Summer Time (Cf. HAEC) (UTC+02)
timezones['CET'] = gettz('Europe/Berlin') # Central European Time (UTC+01)
timezones['CHADT'] = gettz('Pacific/Chatham') # Chatham Daylight Time (UTC+13:45)
timezones['CHAST'] = gettz('Pacific/Chatham') # Chatham Standard Time (UTC+12:45)
timezones['CHOST'] = gettz('Asia/Choibalsan') # Choibalsan Summer Time (UTC+09)
timezones['CHOT'] = gettz('Asia/Choibalsan') # Choibalsan Standard Time (UTC+08)
timezones['CHST'] = gettz('Pacific/Guam') # Chamorro Standard Time (UTC+10)
timezones['CHUT'] = gettz('Pacific/Chuuk') # Chuuk Time (UTC+10)
timezones['CIST'] = gettz('Etc/GMT-8') # Clipperton Island Standard Time (UTC08)
timezones['CIT'] = gettz('Asia/Makassar') # Central Indonesia Time (UTC+08)
timezones['CKT'] = gettz('Pacific/Rarotonga') # Cook Island Time (UTC10)
timezones['CLST'] = gettz('America/Santiago') # Chile Summer Time (UTC03)
timezones['CLT'] = gettz('America/Santiago') # Chile Standard Time (UTC04)
timezones['COST'] = gettz('America/Bogota') # Colombia Summer Time (UTC04)
timezones['COT'] = gettz('America/Bogota') # Colombia Time (UTC05)
timezones['CST'] = gettz('America/Chicago') # Central Standard Time (North America) (UTC06)
timezones['CT'] = gettz('Asia/Chongqing') # China time (UTC+08)
timezones['CVT'] = gettz('Atlantic/Cape_Verde') # Cape Verde Time (UTC01)
timezones['CXT'] = gettz('Indian/Christmas') # Christmas Island Time (UTC+07)
timezones['DAVT'] = gettz('Antarctica/Davis') # Davis Time (UTC+07)
timezones['DDUT'] = gettz('Antarctica/DumontDUrville') # Dumont d'Urville Time (UTC+10)
timezones['DFT'] = gettz('Europe/Berlin') # AIX equivalent of Central European Time (UTC+01)
timezones['EASST'] = gettz('Chile/EasterIsland') # Easter Island Summer Time (UTC05)
timezones['EAST'] = gettz('Chile/EasterIsland') # Easter Island Standard Time (UTC06)
timezones['EAT'] = gettz('Africa/Mogadishu') # East Africa Time (UTC+03)
timezones['ECT'] = gettz('America/Guayaquil') # Ecuador Time (UTC05)
timezones['EDT'] = gettz('America/New_York') # Eastern Daylight Time (North America) (UTC04)
timezones['EEST'] = gettz('Europe/Bucharest') # Eastern European Summer Time (UTC+03)
timezones['EET'] = gettz('Europe/Bucharest') # Eastern European Time (UTC+02)
timezones['EGST'] = gettz('America/Scoresbysund') # Eastern Greenland Summer Time (UTC±00)
timezones['EGT'] = gettz('America/Scoresbysund') # Eastern Greenland Time (UTC01)
timezones['EIT'] = gettz('Asia/Jayapura') # Eastern Indonesian Time (UTC+09)
timezones['EST'] = gettz('America/New_York') # Eastern Standard Time (North America) (UTC05)
timezones['FET'] = gettz('Europe/Minsk') # Further-eastern European Time (UTC+03)
timezones['FJT'] = gettz('Pacific/Fiji') # Fiji Time (UTC+12)
timezones['FKST'] = gettz('Atlantic/Stanley') # Falkland Islands Summer Time (UTC03)
timezones['FKT'] = gettz('Atlantic/Stanley') # Falkland Islands Time (UTC04)
timezones['FNT'] = gettz('Brazil/DeNoronha') # Fernando de Noronha Time (UTC02)
timezones['GALT'] = gettz('Pacific/Galapagos') # Galapagos Time (UTC06)
timezones['GAMT'] = gettz('Pacific/Gambier') # Gambier Islands (UTC09)
timezones['GET'] = gettz('Asia/Tbilisi') # Georgia Standard Time (UTC+04)
timezones['GFT'] = gettz('America/Cayenne') # French Guiana Time (UTC03)
timezones['GILT'] = gettz('Pacific/Tarawa') # Gilbert Island Time (UTC+12)
timezones['GIT'] = gettz('Pacific/Gambier') # Gambier Island Time (UTC09)
timezones['GMT'] = gettz('GMT') # Greenwich Mean Time (UTC±00)
timezones['GST'] = gettz('Asia/Muscat') # Gulf Standard Time (UTC+04)
timezones['GYT'] = gettz('America/Guyana') # Guyana Time (UTC04)
timezones['HADT'] = gettz('Pacific/Honolulu') # Hawaii-Aleutian Daylight Time (UTC09)
timezones['HAEC'] = gettz('Europe/Paris') # Heure Avancée d'Europe Centrale (CEST) (UTC+02)
timezones['HAST'] = gettz('Pacific/Honolulu') # Hawaii-Aleutian Standard Time (UTC10)
timezones['HKT'] = gettz('Asia/Hong_Kong') # Hong Kong Time (UTC+08)
timezones['HMT'] = gettz('Indian/Kerguelen') # Heard and McDonald Islands Time (UTC+05)
timezones['HOVST'] = gettz('Asia/Hovd') # Khovd Summer Time (UTC+08)
timezones['HOVT'] = gettz('Asia/Hovd') # Khovd Standard Time (UTC+07)
timezones['ICT'] = gettz('Asia/Ho_Chi_Minh') # Indochina Time (UTC+07)
timezones['IDT'] = gettz('Asia/Jerusalem') # Israel Daylight Time (UTC+03)
timezones['IOT'] = gettz('Etc/GMT+3') # Indian Ocean Time (UTC+03)
timezones['IRDT'] = gettz('Asia/Tehran') # Iran Daylight Time (UTC+04:30)
timezones['IRKT'] = gettz('Asia/Irkutsk') # Irkutsk Time (UTC+08)
timezones['IRST'] = gettz('Asia/Tehran') # Iran Standard Time (UTC+03:30)
timezones['IST'] = gettz('Asia/Kolkata') # Indian Standard Time (UTC+05:30)
timezones['JST'] = gettz('Asia/Tokyo') # Japan Standard Time (UTC+09)
timezones['KGT'] = gettz('Asia/Bishkek') # Kyrgyzstan time (UTC+06)
timezones['KOST'] = gettz('Pacific/Kosrae') # Kosrae Time (UTC+11)
timezones['KRAT'] = gettz('Asia/Krasnoyarsk') # Krasnoyarsk Time (UTC+07)
timezones['KST'] = gettz('Asia/Seoul') # Korea Standard Time (UTC+09)
timezones['LHST'] = gettz('Australia/Lord_Howe') # Lord Howe Standard Time (UTC+10:30)
timezones['LINT'] = gettz('Pacific/Kiritimati') # Line Islands Time (UTC+14)
timezones['MAGT'] = gettz('Asia/Magadan') # Magadan Time (UTC+12)
timezones['MART'] = gettz('Pacific/Marquesas') # Marquesas Islands Time (UTC09:30)
timezones['MAWT'] = gettz('Antarctica/Mawson') # Mawson Station Time (UTC+05)
timezones['MDT'] = gettz('America/Denver') # Mountain Daylight Time (North America) (UTC06)
timezones['MEST'] = gettz('Europe/Paris') # Middle European Summer Time Same zone as CEST (UTC+02)
timezones['MET'] = gettz('Europe/Berlin') # Middle European Time Same zone as CET (UTC+01)
timezones['MHT'] = gettz('Pacific/Kwajalein') # Marshall Islands (UTC+12)
timezones['MIST'] = gettz('Antarctica/Macquarie') # Macquarie Island Station Time (UTC+11)
timezones['MIT'] = gettz('Pacific/Marquesas') # Marquesas Islands Time (UTC09:30)
timezones['MMT'] = gettz('Asia/Rangoon') # Myanmar Standard Time (UTC+06:30)
timezones['MSK'] = gettz('Europe/Moscow') # Moscow Time (UTC+03)
timezones['MST'] = gettz('America/Denver') # Mountain Standard Time (North America) (UTC07)
timezones['MUT'] = gettz('Indian/Mauritius') # Mauritius Time (UTC+04)
timezones['MVT'] = gettz('Indian/Maldives') # Maldives Time (UTC+05)
timezones['MYT'] = gettz('Asia/Kuching') # Malaysia Time (UTC+08)
timezones['NCT'] = gettz('Pacific/Noumea') # New Caledonia Time (UTC+11)
timezones['NDT'] = gettz('Canada/Newfoundland') # Newfoundland Daylight Time (UTC02:30)
timezones['NFT'] = gettz('Pacific/Norfolk') # Norfolk Time (UTC+11)
timezones['NPT'] = gettz('Asia/Kathmandu') # Nepal Time (UTC+05:45)
timezones['NST'] = gettz('Canada/Newfoundland') # Newfoundland Standard Time (UTC03:30)
timezones['NT'] = gettz('Canada/Newfoundland') # Newfoundland Time (UTC03:30)
timezones['NUT'] = gettz('Pacific/Niue') # Niue Time (UTC11)
timezones['NZDT'] = gettz('Pacific/Auckland') # New Zealand Daylight Time (UTC+13)
timezones['NZST'] = gettz('Pacific/Auckland') # New Zealand Standard Time (UTC+12)
timezones['OMST'] = gettz('Asia/Omsk') # Omsk Time (UTC+06)
timezones['ORAT'] = gettz('Asia/Oral') # Oral Time (UTC+05)
timezones['PDT'] = gettz('America/Los_Angeles') # Pacific Daylight Time (North America) (UTC07)
timezones['PET'] = gettz('America/Lima') # Peru Time (UTC05)
timezones['PETT'] = gettz('Asia/Kamchatka') # Kamchatka Time (UTC+12)
timezones['PGT'] = gettz('Pacific/Port_Moresby') # Papua New Guinea Time (UTC+10)
timezones['PHOT'] = gettz('Pacific/Enderbury') # Phoenix Island Time (UTC+13)
timezones['PKT'] = gettz('Asia/Karachi') # Pakistan Standard Time (UTC+05)
timezones['PMDT'] = gettz('America/Miquelon') # Saint Pierre and Miquelon Daylight time (UTC02)
timezones['PMST'] = gettz('America/Miquelon') # Saint Pierre and Miquelon Standard Time (UTC03)
timezones['PONT'] = gettz('Pacific/Pohnpei') # Pohnpei Standard Time (UTC+11)
timezones['PST'] = gettz('America/Los_Angeles') # Pacific Standard Time (North America) (UTC08)
timezones['PYST'] = gettz('America/Asuncion') # Paraguay Summer Time (South America)[7] (UTC03)
timezones['PYT'] = gettz('America/Asuncion') # Paraguay Time (South America)[8] (UTC04)
timezones['RET'] = gettz('Indian/Reunion') # Réunion Time (UTC+04)
timezones['ROTT'] = gettz('Antarctica/Rothera') # Rothera Research Station Time (UTC03)
timezones['SAKT'] = gettz('Asia/Vladivostok') # Sakhalin Island time (UTC+11)
timezones['SAMT'] = gettz('Europe/Samara') # Samara Time (UTC+04)
timezones['SAST'] = gettz('Africa/Johannesburg') # South African Standard Time (UTC+02)
timezones['SBT'] = gettz('Pacific/Guadalcanal') # Solomon Islands Time (UTC+11)
timezones['SCT'] = gettz('Indian/Mahe') # Seychelles Time (UTC+04)
timezones['SGT'] = gettz('Asia/Singapore') # Singapore Time (UTC+08)
timezones['SLST'] = gettz('Asia/Colombo') # Sri Lanka Standard Time (UTC+05:30)
timezones['SRET'] = gettz('Asia/Srednekolymsk') # Srednekolymsk Time (UTC+11)
timezones['SRT'] = gettz('America/Paramaribo') # Suriname Time (UTC03)
timezones['SST'] = gettz('Asia/Singapore') # Singapore Standard Time (UTC+08)
timezones['SYOT'] = gettz('Antarctica/Syowa') # Showa Station Time (UTC+03)
timezones['TAHT'] = gettz('Pacific/Tahiti') # Tahiti Time (UTC10)
timezones['TFT'] = gettz('Indian/Kerguelen') # Indian/Kerguelen (UTC+05)
timezones['THA'] = gettz('Asia/Bangkok') # Thailand Standard Time (UTC+07)
timezones['TJT'] = gettz('Asia/Dushanbe') # Tajikistan Time (UTC+05)
timezones['TKT'] = gettz('Pacific/Fakaofo') # Tokelau Time (UTC+13)
timezones['TLT'] = gettz('Asia/Dili') # Timor Leste Time (UTC+09)
timezones['TMT'] = gettz('Asia/Ashgabat') # Turkmenistan Time (UTC+05)
timezones['TOT'] = gettz('Pacific/Tongatapu') # Tonga Time (UTC+13)
timezones['TVT'] = gettz('Pacific/Funafuti') # Tuvalu Time (UTC+12)
timezones['ULAST'] = gettz('Asia/Ulan_Bator') # Ulaanbaatar Summer Time (UTC+09)
timezones['ULAT'] = gettz('Asia/Ulan_Bator') # Ulaanbaatar Standard Time (UTC+08)
timezones['USZ1'] = gettz('Europe/Kaliningrad') # Kaliningrad Time (UTC+02)
timezones['UTC'] = gettz('UTC') # Coordinated Universal Time (UTC±00)
timezones['UYST'] = gettz('America/Montevideo') # Uruguay Summer Time (UTC02)
timezones['UYT'] = gettz('America/Montevideo') # Uruguay Standard Time (UTC03)
timezones['UZT'] = gettz('Asia/Tashkent') # Uzbekistan Time (UTC+05)
timezones['VET'] = gettz('America/Caracas') # Venezuelan Standard Time (UTC04)
timezones['VLAT'] = gettz('Asia/Vladivostok') # Vladivostok Time (UTC+10)
timezones['VOLT'] = gettz('Europe/Volgograd') # Volgograd Time (UTC+04)
timezones['VOST'] = gettz('Antarctica/Vostok') # Vostok Station Time (UTC+06)
timezones['VUT'] = gettz('Pacific/Efate') # Vanuatu Time (UTC+11)
timezones['WAKT'] = gettz('Pacific/Wake') # Wake Island Time (UTC+12)
timezones['WAST'] = gettz('Africa/Lagos') # West Africa Summer Time (UTC+02)
timezones['WAT'] = gettz('Africa/Lagos') # West Africa Time (UTC+01)
timezones['WEST'] = gettz('Europe/London') # Western European Summer Time (UTC+01)
timezones['WET'] = gettz('Europe/London') # Western European Time (UTC±00)
timezones['WIT'] = gettz('Asia/Jakarta') # Western Indonesian Time (UTC+07)
timezones['WST'] = gettz('Australia/Perth') # Western Standard Time (UTC+08)
timezones['YAKT'] = gettz('Asia/Yakutsk') # Yakutsk Time (UTC+09)
timezones['YEKT'] = gettz('Asia/Yekaterinburg') # Yekaterinburg Time (UTC+05)
return timezones

@ -1,4 +1,5 @@
import asyncio
from typing import Union
import discord
from redbot.core import Config, checks, commands
@ -61,10 +62,9 @@ class InfoChannel(Cog):
guild: discord.Guild = ctx.guild
channel_id = await self.config.guild(guild).channel_id()
channel = None
if channel_id is not None:
channel: discord.VoiceChannel = guild.get_channel(channel_id)
else:
channel: discord.VoiceChannel = None
channel: Union[discord.VoiceChannel, None] = guild.get_channel(channel_id)
if channel_id is not None and channel is None:
await ctx.send("Info channel has been deleted, recreate it?")

Loading…
Cancel
Save