From 1a5aaff268f6106afa04b573c07969162cd68932 Mon Sep 17 00:00:00 2001 From: bobloy Date: Tue, 25 Aug 2020 16:03:36 -0400 Subject: [PATCH 01/12] initial commit of FIFO, RedConfigJobStore is WIP --- fifo/__init__.py | 7 + fifo/fifo.py | 250 ++++++++++++++++++++++++++++++++++++ fifo/info.json | 18 +++ fifo/jobstores/redconfig.py | 35 +++++ 4 files changed, 310 insertions(+) create mode 100644 fifo/__init__.py create mode 100644 fifo/fifo.py create mode 100644 fifo/info.json create mode 100644 fifo/jobstores/redconfig.py diff --git a/fifo/__init__.py b/fifo/__init__.py new file mode 100644 index 0000000..dedd355 --- /dev/null +++ b/fifo/__init__.py @@ -0,0 +1,7 @@ +from .fifo import FIFO + + +async def setup(bot): + cog = FIFO(bot) + await cog.load_tasks() + bot.add_cog(cog) diff --git a/fifo/fifo.py b/fifo/fifo.py new file mode 100644 index 0000000..ae484d4 --- /dev/null +++ b/fifo/fifo.py @@ -0,0 +1,250 @@ +from typing import Dict, Union + +from apscheduler.executors.asyncio import AsyncIOExecutor +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import AndTrigger, OrTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger +from dateutil import parser +from redbot.core import Config, checks, commands +from redbot.core.bot import Red +from apscheduler.schedulers.asyncio import AsyncIOScheduler +import discord +import asyncio +import datetime + +from redbot.core.commands import DictConverter, TimedeltaConverter, parse_timedelta +from redbot.core.utils import AsyncIter + + +def get_trigger(data): + if data["type"] == "interval": + parsed_time = parse_timedelta(data["timedelta_str"]) + return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds) + + if data["type"] == "date": + return DateTrigger(parser.parse(data["strtime"])) + + if data["type"] == "cron": + return None # TODO: Cron parsing + + +def parse_triggers(data: Union[Dict, None]): + if data is None or not data.get("triggers", False): # No triggers + return None + + if len(data["triggers"]) > 1: # Multiple triggers + return OrTrigger(get_trigger(t_data) for t_data in data["triggers"]) + + return get_trigger(data[0]) + + +class Task: + + default_task_data = {"triggers": [], "command_str": ""} + + default_trigger = { + "type": "", + "timedelta_str": "", + } + + def __init__(self, name: str, guild_id, config: Config): + self.name = name + self.guild_id = guild_id + self.config = config + + self.data = None + + async def load_from_data(self, data: Dict): + self.data = data.copy() + + async def load_from_config(self): + self.data = await self.config.guild_from_id(self.guild_id).tasks.get_raw( + self.name, default=None + ) + return self.data + + async def get_trigger(self) -> Union[BaseTrigger, None]: + if self.data is None: + await self.load_from_config() + + return parse_triggers(self.data) + + # async def set_job_id(self, job_id): + # if self.data is None: + # await self.load_from_config() + # + # self.data["job_id"] = job_id + + async def save_data(self): + await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=self.data) + + async def execute(self): + pass # TODO: something something invoke command + + async def add_trigger(self, param, parsed_time): + pass + + +class FIFO(commands.Cog): + """ + Simple Scheduling Cog + + Named after the simplest scheduling algorithm: First In First Out + """ + + def __init__(self, bot: Red): + super().__init__() + self.bot = bot + self.config = Config.get_conf(self, identifier=70737079, force_registration=True) + + default_global = {"jobs_index": {}, "jobs": []} + default_guild = {"tasks": {}} + + self.config.register_global(**default_global) + self.config.register_guild(**default_guild) + + jobstores = {"default": MemoryJobStore()} + + job_defaults = {"coalesce": False, "max_instances": 1} + + # executors = {"default": AsyncIOExecutor()} + + # Default executor is already AsyncIOExecutor + self.scheduler = AsyncIOScheduler( + jobstores=jobstores, job_defaults=job_defaults + ) + + async def red_delete_data_for_user(self, **kwargs): + """Nothing to delete""" + return + + async def _parse_command(self, command_to_parse: str): + return False # TODO: parse commands somehow + + @checks.is_owner() # Will be reduced when I figure out permissions later + @commands.group() + async def fifo(self, ctx: commands.Context): + """ + Base command for handling scheduling of tasks + """ + if ctx.invoked_subcommand is None: + pass + + @fifo.command(name="list") + async def fifo_list(self, ctx: commands.Context, all_guilds: bool = False): + """ + Lists all current tasks and their triggers. + + Do `[p]fifo list True` to see tasks from all guilds + """ + if all_guilds: + pass + else: + pass # TODO: parse and display tasks + + @fifo.command(name="add") + async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): + """ + Add a new task to this guild's task list + """ + pass + + @fifo.command(name="delete") + async def fifo_delete(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): + """ + Deletes a task from this guild's task list + """ + pass + + @fifo.group(name="trigger") + async def fifo_trigger(self, ctx: commands.Context): + """ + Add a new trigger for a task from the current guild. + """ + if ctx.invoked_subcommand is None: + pass + + @fifo_trigger.command(name="interval") + async def fifo_trigger_interval( + self, ctx: commands.Context, task_name: str, interval_str: TimedeltaConverter + ): + """ + Add an interval trigger to the specified task + """ + + task = Task(task_name, ctx.guild.id, self.config) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + result = await task.add_trigger("interval", interval_str) + if not result: + await ctx.maybe_send_embed( + "Failed to add an interval trigger to this task, see console for logs" + ) + return + await ctx.tick() + + @fifo_trigger.command(name="date") + async def fifo_trigger_date( + self, ctx: commands.Context, task_name: str, datetime_str: TimedeltaConverter + ): + """ + Add a "run once" datetime trigger to the specified task + """ + + task = Task(task_name, ctx.guild.id, self.config) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + result = await task.add_trigger("date", datetime_str) + if not result: + await ctx.maybe_send_embed( + "Failed to add a date trigger to this task, see console for logs" + ) + return + await ctx.tick() + + @fifo_trigger.command(name="cron") + async def fifo_trigger_cron( + self, ctx: commands.Context, task_name: str, cron_settings: DictConverter + ): + """ + Add a "time of day" trigger to the specified task + """ + await ctx.maybe_send_embed("Not yet implemented") + + async def load_tasks(self): + """ + Run once on cog load. + """ + all_guilds = await self.config.all_guilds() + async for guild_id, guild_data in AsyncIter(all_guilds["tasks"].items(), steps=100): + for task_name, task_data in guild_data["tasks"].items(): + task = Task(task_name, guild_id, self.config) + await task.load_from_data(task_data) + + job = self.scheduler.add_job( + task.execute, id=task_name + "_" + guild_id, trigger=await task.get_trigger(), + ) + + self.scheduler.start() + + # async def parent_loop(self): + # await asyncio.sleep(60) + # asyncio.create_task(self.process_tasks(datetime.datetime.utcnow())) + # + # async def process_tasks(self, now: datetime.datetime): + # for task in self.tasks: + # pass diff --git a/fifo/info.json b/fifo/info.json new file mode 100644 index 0000000..4a9cd1c --- /dev/null +++ b/fifo/info.json @@ -0,0 +1,18 @@ +{ + "author": [ + "Bobloy" + ], + "min_bot_version": "3.3.0", + "description": "Schedule commands to be run by certain at certain times or intervals", + "hidden": false, + "install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`", + "short": "Schedule commands to be run by certain at certain times or intervals\"", + "end_user_data_statement": "This cog does not store any End User Data", + "tags": [ + "bobloy", + "utilities", + "tools", + "tool", + "roles" + ] +} \ No newline at end of file diff --git a/fifo/jobstores/redconfig.py b/fifo/jobstores/redconfig.py new file mode 100644 index 0000000..0707b6e --- /dev/null +++ b/fifo/jobstores/redconfig.py @@ -0,0 +1,35 @@ +import asyncio + +from apscheduler.jobstores.base import BaseJobStore +from redbot.core import Config + + +class RedConfigJobStore(BaseJobStore): + def __init__(self, config: Config, loop): + super().__init__() + self.config = config + self.loop: asyncio.BaseEventLoop = loop + + def lookup_job(self, job_id): + task = self.loop.create_task(self.config.jobs_index.get_raw(job_id)) + + def get_due_jobs(self, now): + pass + + def get_next_run_time(self): + pass + + def get_all_jobs(self): + pass + + def add_job(self, job): + pass + + def update_job(self, job): + pass + + def remove_job(self, job_id): + pass + + def remove_all_jobs(self): + pass From c6a9116a9279b66178a6c0882a9cd4b6f24fe229 Mon Sep 17 00:00:00 2001 From: bobloy Date: Wed, 26 Aug 2020 17:36:04 -0400 Subject: [PATCH 02/12] Almost to adding triggers --- fifo/__init__.py | 1 - fifo/datetimeconverter.py | 16 +++ fifo/fifo.py | 252 +++++++++++++++++++++++++++++------- fifo/jobstores/redconfig.py | 35 ----- fifo/redconfigjobstore.py | 189 +++++++++++++++++++++++++++ 5 files changed, 410 insertions(+), 83 deletions(-) create mode 100644 fifo/datetimeconverter.py delete mode 100644 fifo/jobstores/redconfig.py create mode 100644 fifo/redconfigjobstore.py diff --git a/fifo/__init__.py b/fifo/__init__.py index dedd355..860ab97 100644 --- a/fifo/__init__.py +++ b/fifo/__init__.py @@ -3,5 +3,4 @@ from .fifo import FIFO async def setup(bot): cog = FIFO(bot) - await cog.load_tasks() bot.add_cog(cog) diff --git a/fifo/datetimeconverter.py b/fifo/datetimeconverter.py new file mode 100644 index 0000000..bdfbf88 --- /dev/null +++ b/fifo/datetimeconverter.py @@ -0,0 +1,16 @@ +from datetime import datetime +from typing import TYPE_CHECKING + +from discord.ext.commands import BadArgument, Converter +from dateutil import parser + + +if TYPE_CHECKING: + DatetimeConverter = datetime +else: + class DatetimeConverter(Converter): + async def convert(self, ctx, argument) -> datetime: + dt = parser.parse(argument) + if dt is not None: + return dt + raise BadArgument() diff --git a/fifo/fifo.py b/fifo/fifo.py index ae484d4..1ffb612 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,34 +1,34 @@ +from datetime import datetime, timedelta from typing import Dict, Union -from apscheduler.executors.asyncio import AsyncIOExecutor -from apscheduler.jobstores.memory import MemoryJobStore +import discord +from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.base import BaseTrigger -from apscheduler.triggers.combining import AndTrigger, OrTrigger +from apscheduler.triggers.combining import OrTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from dateutil import parser from redbot.core import Config, checks, commands from redbot.core.bot import Red -from apscheduler.schedulers.asyncio import AsyncIOScheduler -import discord -import asyncio -import datetime - from redbot.core.commands import DictConverter, TimedeltaConverter, parse_timedelta -from redbot.core.utils import AsyncIter + +from .datetimeconverter import DatetimeConverter +from .redconfigjobstore import RedConfigJobStore def get_trigger(data): if data["type"] == "interval": - parsed_time = parse_timedelta(data["timedelta_str"]) + parsed_time = data["time_data"] return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds) if data["type"] == "date": - return DateTrigger(parser.parse(data["strtime"])) + return DateTrigger(data["time_data"]) if data["type"] == "cron": return None # TODO: Cron parsing + return False + def parse_triggers(data: Union[Dict, None]): if data is None or not data.get("triggers", False): # No triggers @@ -40,33 +40,95 @@ def parse_triggers(data: Union[Dict, None]): return get_trigger(data[0]) -class Task: +class FakeMessage: + _state = None + +# class FakeMessage(discord.Message): +# def __init__(self): +# super().__init__(state=None, channel=None, data=None) + + +class Task: default_task_data = {"triggers": [], "command_str": ""} default_trigger = { "type": "", - "timedelta_str": "", + "time_data": None, # Used for Interval and Date Triggers } - def __init__(self, name: str, guild_id, config: Config): + def __init__(self, name: str, guild_id, config: Config, author_id=None, bot: Red = None): self.name = name self.guild_id = guild_id self.config = config - + self.bot = bot + self.author_id = author_id self.data = None - async def load_from_data(self, data: Dict): - self.data = data.copy() + async def _encode_time_data(self): + if not self.data or not self.data.get("triggers", None): + return None + + triggers = [] + for t in self.data["triggers"]: + if t["type"] == "interval": # Convert into timedelta + td: timedelta = t["time_data"] + + triggers.append({"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds} }) + + if t["type"] == "date": # Convert into datetime + dt: datetime = t["time_data"] + triggers.append({"type": t["type"], "time_data": { + "year": dt.year, + "month": dt.month, + "day": dt.day, + "hour": dt.hour, + "minute": dt.minute, + "second": dt.second, + }}) + + if t["type"] == "cron": + raise NotImplemented + raise NotImplemented + + return triggers + + async def _decode_time_data(self): + if not self.data or not self.data.get("triggers", None): + return + + for t in self.data["triggers"]: + if t["type"] == "interval": # Convert into timedelta + t["time_data"] = timedelta(**t["time_data"]) + + if t["type"] == "date": # Convert into datetime + t["time_data"] = datetime(**t["time_data"]) + + if t["type"] == "cron": + raise NotImplemented + raise NotImplemented + + # async def load_from_data(self, data: Dict): + # self.data = data.copy() async def load_from_config(self): - self.data = await self.config.guild_from_id(self.guild_id).tasks.get_raw( + data = await self.config.guild_from_id(self.guild_id).tasks.get_raw( self.name, default=None ) + + if not data: + return + + self.author_id = data["author_id"] + self.guild_id = data["guild_id"] + + self.data = data["data"] + + await self._decode_time_data() return self.data async def get_trigger(self) -> Union[BaseTrigger, None]: - if self.data is None: + if not self.data: await self.load_from_config() return parse_triggers(self.data) @@ -77,14 +139,69 @@ class Task: # # self.data["job_id"] = job_id + async def save_all(self): + """To be used when creating an new task""" + + data_to_save = self.default_task_data.copy() + if self.data: + data_to_save["command_str"] = self.data.get("command_str", "") + data_to_save["triggers"] = await self._encode_time_data() + + to_save = { + "guild_id": self.guild_id, + "author_id": self.author_id, + "data": data_to_save, + } + await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save) + async def save_data(self): - await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=self.data) + """To be used when updating triggers""" + if not self.data: + return + await self.config.guild_from_id(self.guild_id).tasks.set_raw( + self.name, "data", value=await self._encode_time_data() + ) async def execute(self): - pass # TODO: something something invoke command + if not self.data or self.data["command_str"]: + return False + message = FakeMessage() + message.guild = self.bot.get_guild(self.guild_id) # used for get_prefix + message.author = message.guild.get_member(self.author_id) + message.content = await self.bot.get_prefix(message) + self.data["command_str"] - async def add_trigger(self, param, parsed_time): - pass + if not message.guild or not message.author or not message.content: + return False + + new_ctx: commands.Context = await self.bot.get_context(message) + if not new_ctx.valid: + return False + + await self.bot.invoke(new_ctx) + return True + + async def set_bot(self, bot: Red): + self.bot = bot + + async def set_author(self, author: Union[discord.User, str]): + self.author_id = getattr(author, "id", None) or author + + async def set_commmand_str(self, command_str): + if not self.data: + self.data = self.default_task_data.copy() + self.data["command_str"] = command_str + return True + + async def add_trigger(self, param, parsed_time: Union[timedelta, datetime]): + trigger_data = {"type": param, "time_data": parsed_time} + if not get_trigger(trigger_data): + return False + + if not self.data: + self.data = self.default_task_data.copy() + + self.data["triggers"].append(trigger_data) + return True class FIFO(commands.Cog): @@ -105,23 +222,50 @@ class FIFO(commands.Cog): self.config.register_global(**default_global) self.config.register_guild(**default_guild) - jobstores = {"default": MemoryJobStore()} + jobstores = {"default": RedConfigJobStore(self.config, self.bot)} job_defaults = {"coalesce": False, "max_instances": 1} # executors = {"default": AsyncIOExecutor()} # Default executor is already AsyncIOExecutor - self.scheduler = AsyncIOScheduler( - jobstores=jobstores, job_defaults=job_defaults - ) + self.scheduler = AsyncIOScheduler(jobstores=jobstores, job_defaults=job_defaults) + + self.scheduler.start() async def red_delete_data_for_user(self, **kwargs): """Nothing to delete""" return - async def _parse_command(self, command_to_parse: str): - return False # TODO: parse commands somehow + def _assemble_job_id(self, task_name, guild_id): + return task_name + "_" + guild_id + + async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): + message = FakeMessage() + message.content = ctx.prefix + command_to_parse + message.author = ctx.author + message.guild = ctx.guild + + new_ctx: commands.Context = await self.bot.get_context(message) + + return new_ctx.valid + + async def _get_job(self, task_name, guild_id): + return self.scheduler.get_job(self._assemble_job_id(task_name, guild_id)) + + async def _add_job(self, task): + return self.scheduler.add_job( + task.execute, + id=self._assemble_job_id(task.name, task.guild_id), + trigger=await task.get_trigger(), + ) + + @checks.is_owner() + @commands.command() + async def fifoclear(self, ctx: commands.Context): + """Debug command to clear fifo config""" + await self.config.guild(ctx.guild).tasks.clear() + await ctx.tick() @checks.is_owner() # Will be reduced when I figure out permissions later @commands.group() @@ -149,10 +293,21 @@ class FIFO(commands.Cog): """ Add a new task to this guild's task list """ - pass + if (await self.config.guild(ctx.guild).tasks.get_raw(task_name, default=None)) is not None: + await ctx.maybe_send_embed(f"Task already exists with {task_name=}") + return + + if not await self._check_parsable_command(ctx, command_to_execute): + await ctx.maybe_send_embed("Failed to parse command. Make sure to include the prefix") + return + + task = Task(task_name, ctx.guild.id, self.config, ctx.author.id) + await task.set_commmand_str(command_to_execute) + await task.save_all() + await ctx.tick() @fifo.command(name="delete") - async def fifo_delete(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): + async def fifo_delete(self, ctx: commands.Context, task_name: str): """ Deletes a task from this guild's task list """ @@ -189,11 +344,12 @@ class FIFO(commands.Cog): "Failed to add an interval trigger to this task, see console for logs" ) return + await task.save_data() await ctx.tick() @fifo_trigger.command(name="date") async def fifo_trigger_date( - self, ctx: commands.Context, task_name: str, datetime_str: TimedeltaConverter + self, ctx: commands.Context, task_name: str, datetime_str: DatetimeConverter ): """ Add a "run once" datetime trigger to the specified task @@ -214,6 +370,8 @@ class FIFO(commands.Cog): "Failed to add a date trigger to this task, see console for logs" ) return + + await task.save_data() await ctx.tick() @fifo_trigger.command(name="cron") @@ -225,21 +383,21 @@ class FIFO(commands.Cog): """ await ctx.maybe_send_embed("Not yet implemented") - async def load_tasks(self): - """ - Run once on cog load. - """ - all_guilds = await self.config.all_guilds() - async for guild_id, guild_data in AsyncIter(all_guilds["tasks"].items(), steps=100): - for task_name, task_data in guild_data["tasks"].items(): - task = Task(task_name, guild_id, self.config) - await task.load_from_data(task_data) - - job = self.scheduler.add_job( - task.execute, id=task_name + "_" + guild_id, trigger=await task.get_trigger(), - ) - - self.scheduler.start() + # async def load_tasks(self): + # """ + # Run once on cog load. + # """ + # all_guilds = await self.config.all_guilds() + # async for guild_id, guild_data in AsyncIter(all_guilds["tasks"].items(), steps=100): + # for task_name, task_data in guild_data["tasks"].items(): + # task = Task(task_name, guild_id, self.config) + # await task.load_from_data(task_data) + # + # job = self.scheduler.add_job( + # task.execute, id=task_name + "_" + guild_id, trigger=await task.get_trigger(), + # ) + # + # self.scheduler.start() # async def parent_loop(self): # await asyncio.sleep(60) diff --git a/fifo/jobstores/redconfig.py b/fifo/jobstores/redconfig.py deleted file mode 100644 index 0707b6e..0000000 --- a/fifo/jobstores/redconfig.py +++ /dev/null @@ -1,35 +0,0 @@ -import asyncio - -from apscheduler.jobstores.base import BaseJobStore -from redbot.core import Config - - -class RedConfigJobStore(BaseJobStore): - def __init__(self, config: Config, loop): - super().__init__() - self.config = config - self.loop: asyncio.BaseEventLoop = loop - - def lookup_job(self, job_id): - task = self.loop.create_task(self.config.jobs_index.get_raw(job_id)) - - def get_due_jobs(self, now): - pass - - def get_next_run_time(self): - pass - - def get_all_jobs(self): - pass - - def add_job(self, job): - pass - - def update_job(self, job): - pass - - def remove_job(self, job_id): - pass - - def remove_all_jobs(self): - pass diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py new file mode 100644 index 0000000..9db7213 --- /dev/null +++ b/fifo/redconfigjobstore.py @@ -0,0 +1,189 @@ +import asyncio + +from apscheduler.jobstores.base import ConflictingIdError, JobLookupError +from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.util import datetime_to_utc_timestamp +from redbot.core import Config + + +# TODO: use get_lock on config +from redbot.core.bot import Red + + + +class RedConfigJobStore(MemoryJobStore): + def __init__(self, config: Config, bot: Red): + super().__init__() + self.config = config + # nest_asyncio.apply() + self.bot = bot + asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop) + + async def _load_from_config(self): + self._jobs = await self.config.jobs() + self._jobs_index = await self.config.jobs_index.all() + + def add_job(self, job): + if job.id in self._jobs_index: + raise ConflictingIdError(job.id) + + timestamp = datetime_to_utc_timestamp(job.next_run_time) + index = self._get_job_index(timestamp, job.id) # This is fine + self._jobs.insert(index, (job, timestamp)) + self._jobs_index[job.id] = (job, timestamp) + asyncio.create_task(self._async_add_job(job, index, timestamp)) + + async def _async_add_job(self, job, index, timestamp): + async with self.config.jobs() as jobs: + jobs.insert(index, (job, timestamp)) + await self.config.jobs_index.set_raw(job.id, value=(job, timestamp)) + return True + + def update_job(self, job): + old_job, old_timestamp = self._jobs_index.get(job.id, (None, None)) + if old_job is None: + raise JobLookupError(job.id) + + # If the next run time has not changed, simply replace the job in its present index. + # Otherwise, reinsert the job to the list to preserve the ordering. + old_index = self._get_job_index(old_timestamp, old_job.id) + new_timestamp = datetime_to_utc_timestamp(job.next_run_time) + asyncio.ensure_future(self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), loop=self.bot.loop) + + async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): + if old_timestamp == new_timestamp: + self._jobs[old_index] = (job, new_timestamp) + async with self.config.jobs() as jobs: + jobs[old_index] = (job, new_timestamp) + else: + del self._jobs[old_index] + new_index = self._get_job_index(new_timestamp, job.id) # This is fine + self._jobs.insert(new_index, (job, new_timestamp)) + async with self.config.jobs() as jobs: + del jobs[old_index] + jobs.insert(new_index, (job, new_timestamp)) + self._jobs_index[old_job.id] = (job, new_timestamp) + await self.config.jobs_index.set_raw(old_job.id, value=(job, new_timestamp)) + + def remove_job(self, job_id): + job, timestamp = self._jobs_index.get(job_id, (None, None)) + if job is None: + raise JobLookupError(job_id) + + index = self._get_job_index(timestamp, job_id) + del self._jobs[index] + del self._jobs_index[job.id] + asyncio.create_task(self._async_remove_job(index, job)) + + async def _async_remove_job(self, index, job): + async with self.config.jobs() as jobs: + del jobs[index] + await self.config.jobs_index.clear_raw(job.id) + + def remove_all_jobs(self): + super().remove_all_jobs() + asyncio.create_task(self._async_remove_all_jobs()) + + async def _async_remove_all_jobs(self): + await self.config.jobs.clear() + await self.config.jobs_index.clear() + + +# import asyncio +# +# from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError +# from apscheduler.util import datetime_to_utc_timestamp +# from redbot.core import Config +# from redbot.core.utils import AsyncIter +# +# +# class RedConfigJobStore(BaseJobStore): +# def __init__(self, config: Config, loop): +# super().__init__() +# self.config = config +# self.loop: asyncio.BaseEventLoop = loop +# +# self._jobs = [] +# self._jobs_index = {} # id -> (job, timestamp) lookup table +# +# def lookup_job(self, job_id): +# return asyncio.run(self._async_lookup_job(job_id)) +# +# async def _async_lookup_job(self, job_id): +# return (await self.config.jobs_index.get_raw(job_id, default=(None, None)))[0] +# +# def get_due_jobs(self, now): +# return asyncio.run(self._async_get_due_jobs(now)) +# +# async def _async_get_due_jobs(self, now): +# now_timestamp = datetime_to_utc_timestamp(now) +# pending = [] +# all_jobs = await self.config.jobs() +# async for job, timestamp in AsyncIter(all_jobs, steps=100): +# if timestamp is None or timestamp > now_timestamp: +# break +# pending.append(job) +# +# return pending +# +# def get_next_run_time(self): +# return asyncio.run(self._async_get_next_run_time()) +# +# async def _async_get_next_run_time(self): +# _jobs = await self.config.jobs() +# return _jobs[0][0].next_run_time if _jobs else None +# +# def get_all_jobs(self): +# return asyncio.run(self._async_get_all_jobs()) +# +# async def _async_get_all_jobs(self): +# return [j[0] for j in (await self.config.jobs())] +# +# def add_job(self, job): +# return asyncio.run(self._async_add_job(job)) +# +# async def _async_add_job(self, job): +# if await self.config.jobs_index.get_raw(job.id, default=None) is not None: +# raise ConflictingIdError(job.id) +# +# timestamp = datetime_to_utc_timestamp(job.next_run_time) +# index = self._get_job_index(timestamp, job.id) +# self._jobs.insert(index, (job, timestamp)) +# self._jobs_index[job.id] = (job, timestamp) +# +# def update_job(self, job): +# pass +# +# def remove_job(self, job_id): +# pass +# +# def remove_all_jobs(self): +# pass +# +# def _get_job_index(self, timestamp, job_id): +# """ +# Returns the index of the given job, or if it's not found, the index where the job should be +# inserted based on the given timestamp. +# +# :type timestamp: int +# :type job_id: str +# +# """ +# lo, hi = 0, len(self._jobs) +# timestamp = float('inf') if timestamp is None else timestamp +# while lo < hi: +# mid = (lo + hi) // 2 +# mid_job, mid_timestamp = self._jobs[mid] +# mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp +# if mid_timestamp > timestamp: +# hi = mid +# elif mid_timestamp < timestamp: +# lo = mid + 1 +# elif mid_job.id > job_id: +# hi = mid +# elif mid_job.id < job_id: +# lo = mid + 1 +# else: +# return mid +# +# return lo From e602b5c868e546bcdd3d736b2fb28b8f5fc3b713 Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 27 Aug 2020 16:22:36 -0400 Subject: [PATCH 03/12] Almost working. Date time is only date, figure out what's going on with time. --- fifo/fifo.py | 214 ++++++++++++++++++++++++++++++-------- fifo/info.json | 3 + fifo/redconfigjobstore.py | 83 +++++++++++++-- fifo/redjob.py | 44 ++++++++ 4 files changed, 287 insertions(+), 57 deletions(-) create mode 100644 fifo/redjob.py diff --git a/fifo/fifo.py b/fifo/fifo.py index 1ffb612..c5ea125 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,19 +1,32 @@ +import logging from datetime import datetime, timedelta from typing import Dict, Union import discord +from apscheduler.job import Job from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import OrTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from dateutil import parser from redbot.core import Config, checks, commands from redbot.core.bot import Red -from redbot.core.commands import DictConverter, TimedeltaConverter, parse_timedelta +from redbot.core.commands import DictConverter, TimedeltaConverter from .datetimeconverter import DatetimeConverter -from .redconfigjobstore import RedConfigJobStore + +log = logging.getLogger("red.fox_v3.fifo") +schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler") +schedule_log.setLevel(logging.DEBUG) +log.setLevel(logging.DEBUG) + + +async def _execute_task(task_state): + log.info(f"Executing {task_state=}") + task = Task(**task_state) + if await task.load_from_config(): + return await task.execute() + return False def get_trigger(data): @@ -37,11 +50,14 @@ def parse_triggers(data: Union[Dict, None]): if len(data["triggers"]) > 1: # Multiple triggers return OrTrigger(get_trigger(t_data) for t_data in data["triggers"]) - return get_trigger(data[0]) + return get_trigger(data["triggers"][0]) -class FakeMessage: - _state = None +# class FakeMessage(discord.Message): +# def __init__(self, *, state, channel, data): +# super().__init__(state=state, channel=channel, data=data) +# +# _state = None # class FakeMessage(discord.Message): @@ -57,35 +73,47 @@ class Task: "time_data": None, # Used for Interval and Date Triggers } - def __init__(self, name: str, guild_id, config: Config, author_id=None, bot: Red = None): + def __init__( + self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None + ): self.name = name self.guild_id = guild_id self.config = config self.bot = bot self.author_id = author_id + self.channel_id = channel_id self.data = None - async def _encode_time_data(self): + async def _encode_time_triggers(self): if not self.data or not self.data.get("triggers", None): - return None + return [] triggers = [] for t in self.data["triggers"]: if t["type"] == "interval": # Convert into timedelta td: timedelta = t["time_data"] - triggers.append({"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds} }) + triggers.append( + {"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}} + ) + continue if t["type"] == "date": # Convert into datetime dt: datetime = t["time_data"] - triggers.append({"type": t["type"], "time_data": { - "year": dt.year, - "month": dt.month, - "day": dt.day, - "hour": dt.hour, - "minute": dt.minute, - "second": dt.second, - }}) + triggers.append( + { + "type": t["type"], + "time_data": { + "year": dt.year, + "month": dt.month, + "day": dt.day, + "hour": dt.hour, + "minute": dt.minute, + "second": dt.second, + }, + } + ) + continue if t["type"] == "cron": raise NotImplemented @@ -93,16 +121,18 @@ class Task: return triggers - async def _decode_time_data(self): + async def _decode_time_triggers(self): if not self.data or not self.data.get("triggers", None): return - for t in self.data["triggers"]: + for n, t in enumerate(self.data["triggers"]): if t["type"] == "interval": # Convert into timedelta - t["time_data"] = timedelta(**t["time_data"]) + self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"]) + continue if t["type"] == "date": # Convert into datetime - t["time_data"] = datetime(**t["time_data"]) + self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) + continue if t["type"] == "cron": raise NotImplemented @@ -121,10 +151,11 @@ class Task: self.author_id = data["author_id"] self.guild_id = data["guild_id"] + self.channel_id = data["channel_id"] self.data = data["data"] - await self._decode_time_data() + await self._decode_time_triggers() return self.data async def get_trigger(self) -> Union[BaseTrigger, None]: @@ -145,11 +176,12 @@ class Task: data_to_save = self.default_task_data.copy() if self.data: data_to_save["command_str"] = self.data.get("command_str", "") - data_to_save["triggers"] = await self._encode_time_data() + data_to_save["triggers"] = await self._encode_time_triggers() to_save = { "guild_id": self.guild_id, "author_id": self.author_id, + "channel_id": self.channel_id, "data": data_to_save, } await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save) @@ -158,23 +190,54 @@ class Task: """To be used when updating triggers""" if not self.data: return + + data_to_save = self.data.copy() + data_to_save["triggers"] = await self._encode_time_triggers() + await self.config.guild_from_id(self.guild_id).tasks.set_raw( - self.name, "data", value=await self._encode_time_data() + self.name, "data", value=data_to_save ) async def execute(self): - if not self.data or self.data["command_str"]: + if not self.data or not self.data.get("command_str", False): + log.warning(f"Could not execute task due to data problem: {self.data=}") return False - message = FakeMessage() - message.guild = self.bot.get_guild(self.guild_id) # used for get_prefix - message.author = message.guild.get_member(self.author_id) - message.content = await self.bot.get_prefix(message) + self.data["command_str"] + + guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix + if guild is None: + log.warning(f"Could not execute task due to missing guild: {self.guild_id}") + return False + channel: discord.TextChannel = guild.get_channel(self.channel_id) + if channel is None: + log.warning(f"Could not execute task due to missing channel: {self.channel_id}") + return False + author: discord.User = guild.get_member(self.author_id) + if author is None: + log.warning(f"Could not execute task due to missing author: {self.author_id}") + return False + + message = channel.last_message + if message is None: + log.warning("No message found in channel cache yet, skipping execution") + return + message.author = author + + prefixes = await self.bot.get_prefix(message) + if isinstance(prefixes, str): + prefix = prefixes + else: + prefix = prefixes[0] + + message.content = f"{prefix}{self.data['command_str']}" if not message.guild or not message.author or not message.content: + log.warning(f"Could not execute task due to message problem: {message}") return False new_ctx: commands.Context = await self.bot.get_context(message) + new_ctx.assume_yes = True if not new_ctx.valid: + log.warning(f"Could not execute task due invalid context: {new_ctx}") return False await self.bot.invoke(new_ctx) @@ -203,6 +266,31 @@ class Task: self.data["triggers"].append(trigger_data) return True + def __setstate__(self, task_state): + self.name = task_state["name"] + self.guild_id = task_state["guild_id"] + self.config = task_state["config"] + self.bot = None + self.author_id = None + self.channel_id = None + self.data = None + + def __getstate__(self): + return { + "name": self.name, + "guild_id": self.guild_id, + "config": self.config, + "bot": self.bot, + } + + +def _assemble_job_id(task_name, guild_id): + return f"{task_name}_{guild_id}" + + +def _disassemble_job_id(job_id: str): + return job_id.split("_") + class FIFO(commands.Cog): """ @@ -222,6 +310,8 @@ class FIFO(commands.Cog): self.config.register_global(**default_global) self.config.register_guild(**default_guild) + from .redconfigjobstore import RedConfigJobStore + jobstores = {"default": RedConfigJobStore(self.config, self.bot)} job_defaults = {"coalesce": False, "max_instances": 1} @@ -229,7 +319,9 @@ class FIFO(commands.Cog): # executors = {"default": AsyncIOExecutor()} # Default executor is already AsyncIOExecutor - self.scheduler = AsyncIOScheduler(jobstores=jobstores, job_defaults=job_defaults) + self.scheduler = AsyncIOScheduler( + jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log + ) self.scheduler.start() @@ -237,34 +329,48 @@ class FIFO(commands.Cog): """Nothing to delete""" return - def _assemble_job_id(self, task_name, guild_id): - return task_name + "_" + guild_id - async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): - message = FakeMessage() + message: discord.Message = ctx.message + message.content = ctx.prefix + command_to_parse message.author = ctx.author - message.guild = ctx.guild new_ctx: commands.Context = await self.bot.get_context(message) return new_ctx.valid - async def _get_job(self, task_name, guild_id): - return self.scheduler.get_job(self._assemble_job_id(task_name, guild_id)) + async def _process_task(self, task: Task): + job = await self._get_job(task) + if job is not None: + job.remove() - async def _add_job(self, task): + return await self._add_job(task) + + async def _get_job(self, task: Task) -> Job: + return self.scheduler.get_job(_assemble_job_id(task.name, task.guild_id)) + + async def _add_job(self, task: Task): return self.scheduler.add_job( - task.execute, - id=self._assemble_job_id(task.name, task.guild_id), + _execute_task, + args=[task.__getstate__()], + id=_assemble_job_id(task.name, task.guild_id), trigger=await task.get_trigger(), ) + async def _pause_job(self, task: Task): + return self.scheduler.pause_job(job_id=_assemble_job_id(task.name, task.guild_id)) + + async def _remove_job(self, task: Task): + return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id)) + @checks.is_owner() @commands.command() async def fifoclear(self, ctx: commands.Context): - """Debug command to clear fifo config""" + """Debug command to clear all current fifo data""" await self.config.guild(ctx.guild).tasks.clear() + await self.config.jobs.clear() + await self.config.jobs_index.clear() + self.scheduler.remove_all_jobs() await ctx.tick() @checks.is_owner() # Will be reduced when I figure out permissions later @@ -286,7 +392,15 @@ class FIFO(commands.Cog): if all_guilds: pass else: - pass # TODO: parse and display tasks + out = "" + all_tasks = await self.config.guild(ctx.guild).tasks() + for task_name, task_data in all_tasks.items(): + out += f"{task_name}: {task_data}\n" + + if out: + await ctx.maybe_send_embed(out) + else: + await ctx.maybe_send_embed("No tasks to list") @fifo.command(name="add") async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): @@ -297,11 +411,17 @@ class FIFO(commands.Cog): await ctx.maybe_send_embed(f"Task already exists with {task_name=}") return + if "_" in task_name: # See _disassemble_job_id + await ctx.maybe_send_embed("Task name cannot contain underscores") + return + if not await self._check_parsable_command(ctx, command_to_execute): - await ctx.maybe_send_embed("Failed to parse command. Make sure to include the prefix") + await ctx.maybe_send_embed( + "Failed to parse command. Make sure not to include the prefix" + ) return - task = Task(task_name, ctx.guild.id, self.config, ctx.author.id) + task = Task(task_name, ctx.guild.id, self.config, ctx.author.id, ctx.channel.id, self.bot) await task.set_commmand_str(command_to_execute) await task.save_all() await ctx.tick() @@ -329,7 +449,7 @@ class FIFO(commands.Cog): Add an interval trigger to the specified task """ - task = Task(task_name, ctx.guild.id, self.config) + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) await task.load_from_config() if task.data is None: @@ -345,6 +465,7 @@ class FIFO(commands.Cog): ) return await task.save_data() + await self._process_task(task) await ctx.tick() @fifo_trigger.command(name="date") @@ -372,6 +493,7 @@ class FIFO(commands.Cog): return await task.save_data() + await self._process_task(task) await ctx.tick() @fifo_trigger.command(name="cron") diff --git a/fifo/info.json b/fifo/info.json index 4a9cd1c..6cc6f9d 100644 --- a/fifo/info.json +++ b/fifo/info.json @@ -8,6 +8,9 @@ "install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`", "short": "Schedule commands to be run by certain at certain times or intervals\"", "end_user_data_statement": "This cog does not store any End User Data", + "requirements": [ + "apscheduler" + ], "tags": [ "bobloy", "utilities", diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 9db7213..4921dca 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -1,42 +1,96 @@ import asyncio +import base64 +import logging +import pickle +from apscheduler.job import Job from apscheduler.jobstores.base import ConflictingIdError, JobLookupError from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.util import datetime_to_utc_timestamp from redbot.core import Config - # TODO: use get_lock on config from redbot.core.bot import Red +log = logging.getLogger("red.fox_v3.fifo.jobstore") +log.setLevel(logging.DEBUG) + +save_task_objects = [] class RedConfigJobStore(MemoryJobStore): def __init__(self, config: Config, bot: Red): super().__init__() self.config = config - # nest_asyncio.apply() self.bot = bot + self.pickle_protocol = pickle.HIGHEST_PROTOCOL asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop) async def _load_from_config(self): self._jobs = await self.config.jobs() + self._jobs = [ + (await self._decode_job(job["job_state"]), timestamp) + for (job, timestamp) in self._jobs + ] self._jobs_index = await self.config.jobs_index.all() - - def add_job(self, job): + self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} + + def _encode_job(self, job: Job): + log.info(f"Encoding job id: {job.id}") + job_state = job.__getstate__() + new_args = list(job_state["args"]) + new_args[0]["config"] = None + new_args[0]["bot"] = None + job_state["args"] = tuple(new_args) + encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol)) + out = { + "_id": job.id, + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": encoded.decode("ascii"), + } + new_args = list(job_state["args"]) + new_args[0]["config"] = self.config + new_args[0]["bot"] = self.bot + job_state["args"] = tuple(new_args) + log.info(f"After encode: Check job args: {job.args=}") + return out + + async def _decode_job(self, job_state): + job_state = pickle.loads(base64.b64decode(job_state)) + new_args = list(job_state["args"]) + new_args[0]["config"] = self.config + new_args[0]["bot"] = self.bot + job_state["args"] = tuple(new_args) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + # task_name, guild_id = _disassemble_job_id(job.id) + # task = Task(task_name, guild_id, self.config) + # await task.load_from_config() + # save_task_objects.append(task) + # + # job.func = task.execute + + log.info(f"Decoded job id: {job.id}") + + return job + + def add_job(self, job: Job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) - + log.info(f"Check job args: {job.args=}") timestamp = datetime_to_utc_timestamp(job.next_run_time) index = self._get_job_index(timestamp, job.id) # This is fine self._jobs.insert(index, (job, timestamp)) self._jobs_index[job.id] = (job, timestamp) asyncio.create_task(self._async_add_job(job, index, timestamp)) + log.info(f"Added job: {self._jobs[index][0].args}") async def _async_add_job(self, job, index, timestamp): async with self.config.jobs() as jobs: - jobs.insert(index, (job, timestamp)) - await self.config.jobs_index.set_raw(job.id, value=(job, timestamp)) + jobs.insert(index, (self._encode_job(job), timestamp)) + await self.config.jobs_index.set_raw(job.id, value=(self._encode_job(job), timestamp)) return True def update_job(self, job): @@ -48,22 +102,29 @@ class RedConfigJobStore(MemoryJobStore): # Otherwise, reinsert the job to the list to preserve the ordering. old_index = self._get_job_index(old_timestamp, old_job.id) new_timestamp = datetime_to_utc_timestamp(job.next_run_time) - asyncio.ensure_future(self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), loop=self.bot.loop) + asyncio.ensure_future( + self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), + loop=self.bot.loop, + ) async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): + encoded_job = self._encode_job(job) if old_timestamp == new_timestamp: self._jobs[old_index] = (job, new_timestamp) async with self.config.jobs() as jobs: - jobs[old_index] = (job, new_timestamp) + jobs[old_index] = (encoded_job, new_timestamp) else: del self._jobs[old_index] new_index = self._get_job_index(new_timestamp, job.id) # This is fine self._jobs.insert(new_index, (job, new_timestamp)) async with self.config.jobs() as jobs: del jobs[old_index] - jobs.insert(new_index, (job, new_timestamp)) + jobs.insert(new_index, (encoded_job, new_timestamp)) self._jobs_index[old_job.id] = (job, new_timestamp) - await self.config.jobs_index.set_raw(old_job.id, value=(job, new_timestamp)) + await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) + + log.info(f"Async Updated {job.id=}") + log.info(f"Check job args: {job.args=}") def remove_job(self, job_id): job, timestamp = self._jobs_index.get(job_id, (None, None)) diff --git a/fifo/redjob.py b/fifo/redjob.py new file mode 100644 index 0000000..c276aa4 --- /dev/null +++ b/fifo/redjob.py @@ -0,0 +1,44 @@ +import six +from apscheduler.job import Job +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.schedulers.base import STATE_STOPPED +from apscheduler.util import undefined + + +class RedJob(Job): + pass + + +class RedAsyncIOScheduler(AsyncIOScheduler): + + def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, + misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, + next_run_time=undefined, jobstore='default', executor='default', + replace_existing=False, **trigger_args): + job_kwargs = { + 'trigger': self._create_trigger(trigger, trigger_args), + 'executor': executor, + 'func': func, + 'args': tuple(args) if args is not None else (), + 'kwargs': dict(kwargs) if kwargs is not None else {}, + 'id': id, + 'name': name, + 'misfire_grace_time': misfire_grace_time, + 'coalesce': coalesce, + 'max_instances': max_instances, + 'next_run_time': next_run_time + } + job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if + value is not undefined) + job = RedJob(self, **job_kwargs) + + # Don't really add jobs to job stores before the scheduler is up and running + with self._jobstores_lock: + if self.state == STATE_STOPPED: + self._pending_jobs.append((job, jobstore, replace_existing)) + self._logger.info('Adding job tentatively -- it will be properly scheduled when ' + 'the scheduler starts') + else: + self._real_add_job(job, jobstore, replace_existing) + + return job From 636b3ee9753ff9a22645af7ac6a7907442794d48 Mon Sep 17 00:00:00 2001 From: bobloy Date: Mon, 31 Aug 2020 07:28:10 -0400 Subject: [PATCH 04/12] Further attempts at fake message object --- fifo/fifo.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/fifo/fifo.py b/fifo/fifo.py index c5ea125..da88283 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -21,6 +21,10 @@ schedule_log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG) +async def _do_nothing(*args, **kwargs): + pass + + async def _execute_task(task_state): log.info(f"Executing {task_state=}") task = Task(**task_state) @@ -60,9 +64,9 @@ def parse_triggers(data: Union[Dict, None]): # _state = None -# class FakeMessage(discord.Message): -# def __init__(self): -# super().__init__(state=None, channel=None, data=None) +class FakeMessage: + def __init__(self, **kwargs): + self.__dict__.update(kwargs) class Task: @@ -216,11 +220,15 @@ class Task: log.warning(f"Could not execute task due to missing author: {self.author_id}") return False - message = channel.last_message - if message is None: + actual_message: discord.Message = channel.last_message + if actual_message is None: log.warning("No message found in channel cache yet, skipping execution") return + + message = FakeMessage(**actual_message.__dict__) message.author = author + message.id = None + message.add_reaction = _do_nothing prefixes = await self.bot.get_prefix(message) if isinstance(prefixes, str): From 68690473c08da148042974ce9951c22445a06e0a Mon Sep 17 00:00:00 2001 From: bobloy Date: Mon, 31 Aug 2020 17:39:08 -0400 Subject: [PATCH 05/12] More WIP fake message --- fifo/datetimeconverter.py | 2 +- fifo/fifo.py | 27 +++++++++++++++------------ 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/fifo/datetimeconverter.py b/fifo/datetimeconverter.py index bdfbf88..c3f96ee 100644 --- a/fifo/datetimeconverter.py +++ b/fifo/datetimeconverter.py @@ -10,7 +10,7 @@ if TYPE_CHECKING: else: class DatetimeConverter(Converter): async def convert(self, ctx, argument) -> datetime: - dt = parser.parse(argument) + dt = parser.parse(argument, fuzzy=True) if dt is not None: return dt raise BadArgument() diff --git a/fifo/fifo.py b/fifo/fifo.py index da88283..b65b681 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -57,16 +57,14 @@ def parse_triggers(data: Union[Dict, None]): return get_trigger(data["triggers"][0]) -# class FakeMessage(discord.Message): -# def __init__(self, *, state, channel, data): -# super().__init__(state=state, channel=channel, data=data) -# -# _state = None +class FakeMessage2(discord.Message): + __slots__ = ("__dict__",) class FakeMessage: - def __init__(self, **kwargs): - self.__dict__.update(kwargs) + def __init__(self, message: discord.Message): + d = {k: getattr(message, k) for k in dir(message)} + self.__dict__.update(**d) class Task: @@ -114,6 +112,7 @@ class Task: "hour": dt.hour, "minute": dt.minute, "second": dt.second, + "tzinfo": dt.tzinfo }, } ) @@ -225,7 +224,8 @@ class Task: log.warning("No message found in channel cache yet, skipping execution") return - message = FakeMessage(**actual_message.__dict__) + message = FakeMessage(actual_message) + # message = FakeMessage2 message.author = author message.id = None message.add_reaction = _do_nothing @@ -451,7 +451,7 @@ class FIFO(commands.Cog): @fifo_trigger.command(name="interval") async def fifo_trigger_interval( - self, ctx: commands.Context, task_name: str, interval_str: TimedeltaConverter + self, ctx: commands.Context, task_name: str, *, interval_str: TimedeltaConverter ): """ Add an interval trigger to the specified task @@ -478,7 +478,7 @@ class FIFO(commands.Cog): @fifo_trigger.command(name="date") async def fifo_trigger_date( - self, ctx: commands.Context, task_name: str, datetime_str: DatetimeConverter + self, ctx: commands.Context, task_name: str, *, datetime_str: DatetimeConverter ): """ Add a "run once" datetime trigger to the specified task @@ -501,8 +501,11 @@ class FIFO(commands.Cog): return await task.save_data() - await self._process_task(task) - await ctx.tick() + job: Job = await self._process_task(task) + await ctx.maybe_send_embed( + f"Task `{task_name}` added {datetime_str} to its scheduled runtimes\n" + f"Next run time: {job.next_run_time}" + ) @fifo_trigger.command(name="cron") async def fifo_trigger_cron( From c34929a93ed5be9ca14673e21a1da30b35d0a946 Mon Sep 17 00:00:00 2001 From: bobloy Date: Tue, 1 Sep 2020 16:47:03 -0400 Subject: [PATCH 06/12] Timezone support, working fake message --- fifo/datetimeconverter.py | 3 +- fifo/fifo.py | 120 +++++++++++++++++------ fifo/redconfigjobstore.py | 21 ++-- fifo/timezones.py | 195 +++++++++++++++++++++++++++++++++++++ infochannel/infochannel.py | 6 +- 5 files changed, 303 insertions(+), 42 deletions(-) create mode 100644 fifo/timezones.py diff --git a/fifo/datetimeconverter.py b/fifo/datetimeconverter.py index c3f96ee..def0403 100644 --- a/fifo/datetimeconverter.py +++ b/fifo/datetimeconverter.py @@ -4,13 +4,14 @@ from typing import TYPE_CHECKING from discord.ext.commands import BadArgument, Converter from dateutil import parser +from fifo.timezones import assemble_timezones if TYPE_CHECKING: DatetimeConverter = datetime else: class DatetimeConverter(Converter): async def convert(self, ctx, argument) -> datetime: - dt = parser.parse(argument, fuzzy=True) + dt = parser.parse(argument, fuzzy=True, tzinfos=assemble_timezones()) if dt is not None: return dt raise BadArgument() diff --git a/fifo/fifo.py b/fifo/fifo.py index b65b681..4c174ee 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,6 +1,6 @@ import logging from datetime import datetime, timedelta -from typing import Dict, Union +from typing import Dict, List, Union import discord from apscheduler.job import Job @@ -63,7 +63,7 @@ class FakeMessage2(discord.Message): class FakeMessage: def __init__(self, message: discord.Message): - d = {k: getattr(message, k) for k in dir(message)} + d = {k: getattr(message, k, None) for k in dir(message)} self.__dict__.update(**d) @@ -102,20 +102,21 @@ class Task: if t["type"] == "date": # Convert into datetime dt: datetime = t["time_data"] - triggers.append( - { - "type": t["type"], - "time_data": { - "year": dt.year, - "month": dt.month, - "day": dt.day, - "hour": dt.hour, - "minute": dt.minute, - "second": dt.second, - "tzinfo": dt.tzinfo - }, - } - ) + triggers.append({"type": t["type"], "time_data": dt.isoformat()}) + # triggers.append( + # { + # "type": t["type"], + # "time_data": { + # "year": dt.year, + # "month": dt.month, + # "day": dt.day, + # "hour": dt.hour, + # "minute": dt.minute, + # "second": dt.second, + # "tzinfo": dt.tzinfo, + # }, + # } + # ) continue if t["type"] == "cron": @@ -134,7 +135,8 @@ class Task: continue if t["type"] == "date": # Convert into datetime - self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) + # self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) + self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"]) continue if t["type"] == "cron": @@ -161,7 +163,16 @@ class Task: await self._decode_time_triggers() return self.data - async def get_trigger(self) -> Union[BaseTrigger, None]: + async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]: + if not self.data: + await self.load_from_config() + + if self.data is None or "triggers" not in self.data: # No triggers + return [] + + return [get_trigger(t) for t in self.data["triggers"]] + + async def get_combined_trigger(self) -> Union[BaseTrigger, None]: if not self.data: await self.load_from_config() @@ -178,7 +189,7 @@ class Task: data_to_save = self.default_task_data.copy() if self.data: - data_to_save["command_str"] = self.data.get("command_str", "") + data_to_save["command_str"] = self.get_command_str() data_to_save["triggers"] = await self._encode_time_triggers() to_save = { @@ -202,7 +213,7 @@ class Task: ) async def execute(self): - if not self.data or not self.data.get("command_str", False): + if not self.data or not self.get_command_str(): log.warning(f"Could not execute task due to data problem: {self.data=}") return False @@ -236,7 +247,7 @@ class Task: else: prefix = prefixes[0] - message.content = f"{prefix}{self.data['command_str']}" + message.content = f"{prefix}{self.get_command_str()}" if not message.guild or not message.author or not message.content: log.warning(f"Could not execute task due to message problem: {message}") @@ -257,6 +268,9 @@ class Task: async def set_author(self, author: Union[discord.User, str]): self.author_id = getattr(author, "id", None) or author + def get_command_str(self): + return self.data.get("command_str", "") + async def set_commmand_str(self, command_str): if not self.data: self.data = self.default_task_data.copy() @@ -348,10 +362,10 @@ class FIFO(commands.Cog): return new_ctx.valid async def _process_task(self, task: Task): - job = await self._get_job(task) + job: Union[Job, None] = await self._get_job(task) if job is not None: - job.remove() - + job.reschedule(await task.get_combined_trigger()) + return job return await self._add_job(task) async def _get_job(self, task: Task) -> Job: @@ -362,7 +376,7 @@ class FIFO(commands.Cog): _execute_task, args=[task.__getstate__()], id=_assemble_job_id(task.name, task.guild_id), - trigger=await task.get_trigger(), + trigger=await task.get_combined_trigger(), ) async def _pause_job(self, task: Task): @@ -375,10 +389,10 @@ class FIFO(commands.Cog): @commands.command() async def fifoclear(self, ctx: commands.Context): """Debug command to clear all current fifo data""" + self.scheduler.remove_all_jobs() await self.config.guild(ctx.guild).tasks.clear() await self.config.jobs.clear() await self.config.jobs_index.clear() - self.scheduler.remove_all_jobs() await ctx.tick() @checks.is_owner() # Will be reduced when I figure out permissions later @@ -390,6 +404,47 @@ class FIFO(commands.Cog): if ctx.invoked_subcommand is None: pass + @fifo.command(name="details") + async def fifo_details(self, ctx: commands.Context, task_name: str): + """ + Provide all the details on the specified task name + + """ + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + embed = discord.Embed(title=task_name) + + embed.add_field( + name="Task command", value=f"{ctx.prefix}{task.get_command_str()}", inline=False + ) + + guild: discord.Guild = self.bot.get_guild(task.guild_id) + + if guild is not None: + author: discord.Member = guild.get_member(task.author_id) + channel: discord.TextChannel = guild.get_channel(task.channel_id) + embed.add_field(name="Server", value=guild.name) + if author is not None: + embed.add_field(name="Author", value=author.mention) + if channel is not None: + embed.add_field(name="Channel", value=channel.mention) + + else: + embed.add_field(name="Server", value="Server not found") + + trigger_str = "\n".join(str(t) for t in await task.get_triggers()) + if trigger_str: + embed.add_field(name="Triggers", value=trigger_str, inline=False) + + await ctx.send(embed=embed) + @fifo.command(name="list") async def fifo_list(self, ctx: commands.Context, all_guilds: bool = False): """ @@ -473,8 +528,12 @@ class FIFO(commands.Cog): ) return await task.save_data() - await self._process_task(task) - await ctx.tick() + job: Job = await self._process_task(task) + delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo) + await ctx.maybe_send_embed( + f"Task `{task_name}` added interval of {interval_str} to its scheduled runtimes\n" + f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)" + ) @fifo_trigger.command(name="date") async def fifo_trigger_date( @@ -502,9 +561,10 @@ class FIFO(commands.Cog): await task.save_data() job: Job = await self._process_task(task) + delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo) await ctx.maybe_send_embed( f"Task `{task_name}` added {datetime_str} to its scheduled runtimes\n" - f"Next run time: {job.next_run_time}" + f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)" ) @fifo_trigger.command(name="cron") @@ -527,7 +587,7 @@ class FIFO(commands.Cog): # await task.load_from_data(task_data) # # job = self.scheduler.add_job( - # task.execute, id=task_name + "_" + guild_id, trigger=await task.get_trigger(), + # task.execute, id=task_name + "_" + guild_id, trigger=await task.get_combined_trigger(), # ) # # self.scheduler.start() diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 4921dca..cbbe0ab 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -2,6 +2,8 @@ import asyncio import base64 import logging import pickle +from datetime import datetime +from typing import Tuple, Union from apscheduler.job import Job from apscheduler.jobstores.base import ConflictingIdError, JobLookupError @@ -36,7 +38,7 @@ class RedConfigJobStore(MemoryJobStore): self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} def _encode_job(self, job: Job): - log.info(f"Encoding job id: {job.id}") + # log.debug(f"Encoding job id: {job.id}") job_state = job.__getstate__() new_args = list(job_state["args"]) new_args[0]["config"] = None @@ -52,7 +54,6 @@ class RedConfigJobStore(MemoryJobStore): new_args[0]["config"] = self.config new_args[0]["bot"] = self.bot job_state["args"] = tuple(new_args) - log.info(f"After encode: Check job args: {job.args=}") return out async def _decode_job(self, job_state): @@ -72,20 +73,20 @@ class RedConfigJobStore(MemoryJobStore): # # job.func = task.execute - log.info(f"Decoded job id: {job.id}") + # log.debug(f"Decoded job id: {job.id}") return job def add_job(self, job: Job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) - log.info(f"Check job args: {job.args=}") + # log.debug(f"Check job args: {job.args=}") timestamp = datetime_to_utc_timestamp(job.next_run_time) index = self._get_job_index(timestamp, job.id) # This is fine self._jobs.insert(index, (job, timestamp)) self._jobs_index[job.id] = (job, timestamp) asyncio.create_task(self._async_add_job(job, index, timestamp)) - log.info(f"Added job: {self._jobs[index][0].args}") + # log.debug(f"Added job: {self._jobs[index][0].args}") async def _async_add_job(self, job, index, timestamp): async with self.config.jobs() as jobs: @@ -94,7 +95,11 @@ class RedConfigJobStore(MemoryJobStore): return True def update_job(self, job): - old_job, old_timestamp = self._jobs_index.get(job.id, (None, None)) + old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( + job.id, (None, None) + ) + old_job = old_tuple[0] + old_timestamp = old_tuple[1] if old_job is None: raise JobLookupError(job.id) @@ -123,8 +128,8 @@ class RedConfigJobStore(MemoryJobStore): self._jobs_index[old_job.id] = (job, new_timestamp) await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) - log.info(f"Async Updated {job.id=}") - log.info(f"Check job args: {job.args=}") + log.debug(f"Async Updated {job.id=}") + log.debug(f"Check job args: {job.args=}") def remove_job(self, job_id): job, timestamp = self._jobs_index.get(job_id, (None, None)) diff --git a/fifo/timezones.py b/fifo/timezones.py new file mode 100644 index 0000000..5a322a4 --- /dev/null +++ b/fifo/timezones.py @@ -0,0 +1,195 @@ +""" +Timezone information for the dateutil parser + +All credit to https://github.com/prefrontal/dateutil-parser-timezones +""" + +from dateutil.tz import gettz + + +def assemble_timezones(): + """ + Assembles a dictionary of timezone abbreviations and values + :return: Dictionary of abbreviation keys and timezone values + """ + timezones = {} + + timezones['ACDT'] = gettz('Australia/Darwin') # Australian Central Daylight Savings Time (UTC+10:30) + timezones['ACST'] = gettz('Australia/Darwin') # Australian Central Standard Time (UTC+09:30) + timezones['ACT'] = gettz('Brazil/Acre') # Acre Time (UTC−05) + timezones['ADT'] = gettz('America/Halifax') # Atlantic Daylight Time (UTC−03) + timezones['AEDT'] = gettz('Australia/Sydney') # Australian Eastern Daylight Savings Time (UTC+11) + timezones['AEST'] = gettz('Australia/Sydney') # Australian Eastern Standard Time (UTC+10) + timezones['AFT'] = gettz('Asia/Kabul') # Afghanistan Time (UTC+04:30) + timezones['AKDT'] = gettz('America/Juneau') # Alaska Daylight Time (UTC−08) + timezones['AKST'] = gettz('America/Juneau') # Alaska Standard Time (UTC−09) + timezones['AMST'] = gettz('America/Manaus') # Amazon Summer Time (Brazil)[1] (UTC−03) + timezones['AMT'] = gettz('America/Manaus') # Amazon Time (Brazil)[2] (UTC−04) + timezones['ART'] = gettz('America/Cordoba') # Argentina Time (UTC−03) + timezones['AST'] = gettz('Asia/Riyadh') # Arabia Standard Time (UTC+03) + timezones['AWST'] = gettz('Australia/Perth') # Australian Western Standard Time (UTC+08) + timezones['AZOST'] = gettz('Atlantic/Azores') # Azores Summer Time (UTC±00) + timezones['AZOT'] = gettz('Atlantic/Azores') # Azores Standard Time (UTC−01) + timezones['AZT'] = gettz('Asia/Baku') # Azerbaijan Time (UTC+04) + timezones['BDT'] = gettz('Asia/Brunei') # Brunei Time (UTC+08) + timezones['BIOT'] = gettz('Etc/GMT+6') # British Indian Ocean Time (UTC+06) + timezones['BIT'] = gettz('Pacific/Funafuti') # Baker Island Time (UTC−12) + timezones['BOT'] = gettz('America/La_Paz') # Bolivia Time (UTC−04) + timezones['BRST'] = gettz('America/Sao_Paulo') # Brasilia Summer Time (UTC−02) + timezones['BRT'] = gettz('America/Sao_Paulo') # Brasilia Time (UTC−03) + timezones['BST'] = gettz('Asia/Dhaka') # Bangladesh Standard Time (UTC+06) + timezones['BTT'] = gettz('Asia/Thimphu') # Bhutan Time (UTC+06) + timezones['CAT'] = gettz('Africa/Harare') # Central Africa Time (UTC+02) + timezones['CCT'] = gettz('Indian/Cocos') # Cocos Islands Time (UTC+06:30) + timezones['CDT'] = gettz('America/Chicago') # Central Daylight Time (North America) (UTC−05) + timezones['CEST'] = gettz('Europe/Berlin') # Central European Summer Time (Cf. HAEC) (UTC+02) + timezones['CET'] = gettz('Europe/Berlin') # Central European Time (UTC+01) + timezones['CHADT'] = gettz('Pacific/Chatham') # Chatham Daylight Time (UTC+13:45) + timezones['CHAST'] = gettz('Pacific/Chatham') # Chatham Standard Time (UTC+12:45) + timezones['CHOST'] = gettz('Asia/Choibalsan') # Choibalsan Summer Time (UTC+09) + timezones['CHOT'] = gettz('Asia/Choibalsan') # Choibalsan Standard Time (UTC+08) + timezones['CHST'] = gettz('Pacific/Guam') # Chamorro Standard Time (UTC+10) + timezones['CHUT'] = gettz('Pacific/Chuuk') # Chuuk Time (UTC+10) + timezones['CIST'] = gettz('Etc/GMT-8') # Clipperton Island Standard Time (UTC−08) + timezones['CIT'] = gettz('Asia/Makassar') # Central Indonesia Time (UTC+08) + timezones['CKT'] = gettz('Pacific/Rarotonga') # Cook Island Time (UTC−10) + timezones['CLST'] = gettz('America/Santiago') # Chile Summer Time (UTC−03) + timezones['CLT'] = gettz('America/Santiago') # Chile Standard Time (UTC−04) + timezones['COST'] = gettz('America/Bogota') # Colombia Summer Time (UTC−04) + timezones['COT'] = gettz('America/Bogota') # Colombia Time (UTC−05) + timezones['CST'] = gettz('America/Chicago') # Central Standard Time (North America) (UTC−06) + timezones['CT'] = gettz('Asia/Chongqing') # China time (UTC+08) + timezones['CVT'] = gettz('Atlantic/Cape_Verde') # Cape Verde Time (UTC−01) + timezones['CXT'] = gettz('Indian/Christmas') # Christmas Island Time (UTC+07) + timezones['DAVT'] = gettz('Antarctica/Davis') # Davis Time (UTC+07) + timezones['DDUT'] = gettz('Antarctica/DumontDUrville') # Dumont d'Urville Time (UTC+10) + timezones['DFT'] = gettz('Europe/Berlin') # AIX equivalent of Central European Time (UTC+01) + timezones['EASST'] = gettz('Chile/EasterIsland') # Easter Island Summer Time (UTC−05) + timezones['EAST'] = gettz('Chile/EasterIsland') # Easter Island Standard Time (UTC−06) + timezones['EAT'] = gettz('Africa/Mogadishu') # East Africa Time (UTC+03) + timezones['ECT'] = gettz('America/Guayaquil') # Ecuador Time (UTC−05) + timezones['EDT'] = gettz('America/New_York') # Eastern Daylight Time (North America) (UTC−04) + timezones['EEST'] = gettz('Europe/Bucharest') # Eastern European Summer Time (UTC+03) + timezones['EET'] = gettz('Europe/Bucharest') # Eastern European Time (UTC+02) + timezones['EGST'] = gettz('America/Scoresbysund') # Eastern Greenland Summer Time (UTC±00) + timezones['EGT'] = gettz('America/Scoresbysund') # Eastern Greenland Time (UTC−01) + timezones['EIT'] = gettz('Asia/Jayapura') # Eastern Indonesian Time (UTC+09) + timezones['EST'] = gettz('America/New_York') # Eastern Standard Time (North America) (UTC−05) + timezones['FET'] = gettz('Europe/Minsk') # Further-eastern European Time (UTC+03) + timezones['FJT'] = gettz('Pacific/Fiji') # Fiji Time (UTC+12) + timezones['FKST'] = gettz('Atlantic/Stanley') # Falkland Islands Summer Time (UTC−03) + timezones['FKT'] = gettz('Atlantic/Stanley') # Falkland Islands Time (UTC−04) + timezones['FNT'] = gettz('Brazil/DeNoronha') # Fernando de Noronha Time (UTC−02) + timezones['GALT'] = gettz('Pacific/Galapagos') # Galapagos Time (UTC−06) + timezones['GAMT'] = gettz('Pacific/Gambier') # Gambier Islands (UTC−09) + timezones['GET'] = gettz('Asia/Tbilisi') # Georgia Standard Time (UTC+04) + timezones['GFT'] = gettz('America/Cayenne') # French Guiana Time (UTC−03) + timezones['GILT'] = gettz('Pacific/Tarawa') # Gilbert Island Time (UTC+12) + timezones['GIT'] = gettz('Pacific/Gambier') # Gambier Island Time (UTC−09) + timezones['GMT'] = gettz('GMT') # Greenwich Mean Time (UTC±00) + timezones['GST'] = gettz('Asia/Muscat') # Gulf Standard Time (UTC+04) + timezones['GYT'] = gettz('America/Guyana') # Guyana Time (UTC−04) + timezones['HADT'] = gettz('Pacific/Honolulu') # Hawaii-Aleutian Daylight Time (UTC−09) + timezones['HAEC'] = gettz('Europe/Paris') # Heure Avancée d'Europe Centrale (CEST) (UTC+02) + timezones['HAST'] = gettz('Pacific/Honolulu') # Hawaii-Aleutian Standard Time (UTC−10) + timezones['HKT'] = gettz('Asia/Hong_Kong') # Hong Kong Time (UTC+08) + timezones['HMT'] = gettz('Indian/Kerguelen') # Heard and McDonald Islands Time (UTC+05) + timezones['HOVST'] = gettz('Asia/Hovd') # Khovd Summer Time (UTC+08) + timezones['HOVT'] = gettz('Asia/Hovd') # Khovd Standard Time (UTC+07) + timezones['ICT'] = gettz('Asia/Ho_Chi_Minh') # Indochina Time (UTC+07) + timezones['IDT'] = gettz('Asia/Jerusalem') # Israel Daylight Time (UTC+03) + timezones['IOT'] = gettz('Etc/GMT+3') # Indian Ocean Time (UTC+03) + timezones['IRDT'] = gettz('Asia/Tehran') # Iran Daylight Time (UTC+04:30) + timezones['IRKT'] = gettz('Asia/Irkutsk') # Irkutsk Time (UTC+08) + timezones['IRST'] = gettz('Asia/Tehran') # Iran Standard Time (UTC+03:30) + timezones['IST'] = gettz('Asia/Kolkata') # Indian Standard Time (UTC+05:30) + timezones['JST'] = gettz('Asia/Tokyo') # Japan Standard Time (UTC+09) + timezones['KGT'] = gettz('Asia/Bishkek') # Kyrgyzstan time (UTC+06) + timezones['KOST'] = gettz('Pacific/Kosrae') # Kosrae Time (UTC+11) + timezones['KRAT'] = gettz('Asia/Krasnoyarsk') # Krasnoyarsk Time (UTC+07) + timezones['KST'] = gettz('Asia/Seoul') # Korea Standard Time (UTC+09) + timezones['LHST'] = gettz('Australia/Lord_Howe') # Lord Howe Standard Time (UTC+10:30) + timezones['LINT'] = gettz('Pacific/Kiritimati') # Line Islands Time (UTC+14) + timezones['MAGT'] = gettz('Asia/Magadan') # Magadan Time (UTC+12) + timezones['MART'] = gettz('Pacific/Marquesas') # Marquesas Islands Time (UTC−09:30) + timezones['MAWT'] = gettz('Antarctica/Mawson') # Mawson Station Time (UTC+05) + timezones['MDT'] = gettz('America/Denver') # Mountain Daylight Time (North America) (UTC−06) + timezones['MEST'] = gettz('Europe/Paris') # Middle European Summer Time Same zone as CEST (UTC+02) + timezones['MET'] = gettz('Europe/Berlin') # Middle European Time Same zone as CET (UTC+01) + timezones['MHT'] = gettz('Pacific/Kwajalein') # Marshall Islands (UTC+12) + timezones['MIST'] = gettz('Antarctica/Macquarie') # Macquarie Island Station Time (UTC+11) + timezones['MIT'] = gettz('Pacific/Marquesas') # Marquesas Islands Time (UTC−09:30) + timezones['MMT'] = gettz('Asia/Rangoon') # Myanmar Standard Time (UTC+06:30) + timezones['MSK'] = gettz('Europe/Moscow') # Moscow Time (UTC+03) + timezones['MST'] = gettz('America/Denver') # Mountain Standard Time (North America) (UTC−07) + timezones['MUT'] = gettz('Indian/Mauritius') # Mauritius Time (UTC+04) + timezones['MVT'] = gettz('Indian/Maldives') # Maldives Time (UTC+05) + timezones['MYT'] = gettz('Asia/Kuching') # Malaysia Time (UTC+08) + timezones['NCT'] = gettz('Pacific/Noumea') # New Caledonia Time (UTC+11) + timezones['NDT'] = gettz('Canada/Newfoundland') # Newfoundland Daylight Time (UTC−02:30) + timezones['NFT'] = gettz('Pacific/Norfolk') # Norfolk Time (UTC+11) + timezones['NPT'] = gettz('Asia/Kathmandu') # Nepal Time (UTC+05:45) + timezones['NST'] = gettz('Canada/Newfoundland') # Newfoundland Standard Time (UTC−03:30) + timezones['NT'] = gettz('Canada/Newfoundland') # Newfoundland Time (UTC−03:30) + timezones['NUT'] = gettz('Pacific/Niue') # Niue Time (UTC−11) + timezones['NZDT'] = gettz('Pacific/Auckland') # New Zealand Daylight Time (UTC+13) + timezones['NZST'] = gettz('Pacific/Auckland') # New Zealand Standard Time (UTC+12) + timezones['OMST'] = gettz('Asia/Omsk') # Omsk Time (UTC+06) + timezones['ORAT'] = gettz('Asia/Oral') # Oral Time (UTC+05) + timezones['PDT'] = gettz('America/Los_Angeles') # Pacific Daylight Time (North America) (UTC−07) + timezones['PET'] = gettz('America/Lima') # Peru Time (UTC−05) + timezones['PETT'] = gettz('Asia/Kamchatka') # Kamchatka Time (UTC+12) + timezones['PGT'] = gettz('Pacific/Port_Moresby') # Papua New Guinea Time (UTC+10) + timezones['PHOT'] = gettz('Pacific/Enderbury') # Phoenix Island Time (UTC+13) + timezones['PKT'] = gettz('Asia/Karachi') # Pakistan Standard Time (UTC+05) + timezones['PMDT'] = gettz('America/Miquelon') # Saint Pierre and Miquelon Daylight time (UTC−02) + timezones['PMST'] = gettz('America/Miquelon') # Saint Pierre and Miquelon Standard Time (UTC−03) + timezones['PONT'] = gettz('Pacific/Pohnpei') # Pohnpei Standard Time (UTC+11) + timezones['PST'] = gettz('America/Los_Angeles') # Pacific Standard Time (North America) (UTC−08) + timezones['PYST'] = gettz('America/Asuncion') # Paraguay Summer Time (South America)[7] (UTC−03) + timezones['PYT'] = gettz('America/Asuncion') # Paraguay Time (South America)[8] (UTC−04) + timezones['RET'] = gettz('Indian/Reunion') # Réunion Time (UTC+04) + timezones['ROTT'] = gettz('Antarctica/Rothera') # Rothera Research Station Time (UTC−03) + timezones['SAKT'] = gettz('Asia/Vladivostok') # Sakhalin Island time (UTC+11) + timezones['SAMT'] = gettz('Europe/Samara') # Samara Time (UTC+04) + timezones['SAST'] = gettz('Africa/Johannesburg') # South African Standard Time (UTC+02) + timezones['SBT'] = gettz('Pacific/Guadalcanal') # Solomon Islands Time (UTC+11) + timezones['SCT'] = gettz('Indian/Mahe') # Seychelles Time (UTC+04) + timezones['SGT'] = gettz('Asia/Singapore') # Singapore Time (UTC+08) + timezones['SLST'] = gettz('Asia/Colombo') # Sri Lanka Standard Time (UTC+05:30) + timezones['SRET'] = gettz('Asia/Srednekolymsk') # Srednekolymsk Time (UTC+11) + timezones['SRT'] = gettz('America/Paramaribo') # Suriname Time (UTC−03) + timezones['SST'] = gettz('Asia/Singapore') # Singapore Standard Time (UTC+08) + timezones['SYOT'] = gettz('Antarctica/Syowa') # Showa Station Time (UTC+03) + timezones['TAHT'] = gettz('Pacific/Tahiti') # Tahiti Time (UTC−10) + timezones['TFT'] = gettz('Indian/Kerguelen') # Indian/Kerguelen (UTC+05) + timezones['THA'] = gettz('Asia/Bangkok') # Thailand Standard Time (UTC+07) + timezones['TJT'] = gettz('Asia/Dushanbe') # Tajikistan Time (UTC+05) + timezones['TKT'] = gettz('Pacific/Fakaofo') # Tokelau Time (UTC+13) + timezones['TLT'] = gettz('Asia/Dili') # Timor Leste Time (UTC+09) + timezones['TMT'] = gettz('Asia/Ashgabat') # Turkmenistan Time (UTC+05) + timezones['TOT'] = gettz('Pacific/Tongatapu') # Tonga Time (UTC+13) + timezones['TVT'] = gettz('Pacific/Funafuti') # Tuvalu Time (UTC+12) + timezones['ULAST'] = gettz('Asia/Ulan_Bator') # Ulaanbaatar Summer Time (UTC+09) + timezones['ULAT'] = gettz('Asia/Ulan_Bator') # Ulaanbaatar Standard Time (UTC+08) + timezones['USZ1'] = gettz('Europe/Kaliningrad') # Kaliningrad Time (UTC+02) + timezones['UTC'] = gettz('UTC') # Coordinated Universal Time (UTC±00) + timezones['UYST'] = gettz('America/Montevideo') # Uruguay Summer Time (UTC−02) + timezones['UYT'] = gettz('America/Montevideo') # Uruguay Standard Time (UTC−03) + timezones['UZT'] = gettz('Asia/Tashkent') # Uzbekistan Time (UTC+05) + timezones['VET'] = gettz('America/Caracas') # Venezuelan Standard Time (UTC−04) + timezones['VLAT'] = gettz('Asia/Vladivostok') # Vladivostok Time (UTC+10) + timezones['VOLT'] = gettz('Europe/Volgograd') # Volgograd Time (UTC+04) + timezones['VOST'] = gettz('Antarctica/Vostok') # Vostok Station Time (UTC+06) + timezones['VUT'] = gettz('Pacific/Efate') # Vanuatu Time (UTC+11) + timezones['WAKT'] = gettz('Pacific/Wake') # Wake Island Time (UTC+12) + timezones['WAST'] = gettz('Africa/Lagos') # West Africa Summer Time (UTC+02) + timezones['WAT'] = gettz('Africa/Lagos') # West Africa Time (UTC+01) + timezones['WEST'] = gettz('Europe/London') # Western European Summer Time (UTC+01) + timezones['WET'] = gettz('Europe/London') # Western European Time (UTC±00) + timezones['WIT'] = gettz('Asia/Jakarta') # Western Indonesian Time (UTC+07) + timezones['WST'] = gettz('Australia/Perth') # Western Standard Time (UTC+08) + timezones['YAKT'] = gettz('Asia/Yakutsk') # Yakutsk Time (UTC+09) + timezones['YEKT'] = gettz('Asia/Yekaterinburg') # Yekaterinburg Time (UTC+05) + + return timezones \ No newline at end of file diff --git a/infochannel/infochannel.py b/infochannel/infochannel.py index c7297b8..b8d36a3 100644 --- a/infochannel/infochannel.py +++ b/infochannel/infochannel.py @@ -1,4 +1,5 @@ import asyncio +from typing import Union import discord from redbot.core import Config, checks, commands @@ -61,10 +62,9 @@ class InfoChannel(Cog): guild: discord.Guild = ctx.guild channel_id = await self.config.guild(guild).channel_id() + channel = None if channel_id is not None: - channel: discord.VoiceChannel = guild.get_channel(channel_id) - else: - channel: discord.VoiceChannel = None + channel: Union[discord.VoiceChannel, None] = guild.get_channel(channel_id) if channel_id is not None and channel is None: await ctx.send("Info channel has been deleted, recreate it?") From f24183d4f238b4ee32aca4d63aa727f2e072704f Mon Sep 17 00:00:00 2001 From: bobloy Date: Tue, 1 Sep 2020 17:42:13 -0400 Subject: [PATCH 07/12] Pausing and resuming, discord ID --- fifo/fifo.py | 74 ++++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/fifo/fifo.py b/fifo/fifo.py index 4c174ee..e210f90 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,14 +1,17 @@ import logging from datetime import datetime, timedelta -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union import discord from apscheduler.job import Job +from apscheduler.jobstores.base import JobLookupError from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import OrTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger +from discord.utils import time_snowflake from redbot.core import Config, checks, commands from redbot.core.bot import Red from redbot.core.commands import DictConverter, TimedeltaConverter @@ -238,7 +241,7 @@ class Task: message = FakeMessage(actual_message) # message = FakeMessage2 message.author = author - message.id = None + message.id = time_snowflake(datetime.now()) # Pretend to be now message.add_reaction = _do_nothing prefixes = await self.bot.get_prefix(message) @@ -345,12 +348,16 @@ class FIFO(commands.Cog): jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log ) - self.scheduler.start() + self.scheduler.start() # TODO: Jobs are not receiving next_run_times async def red_delete_data_for_user(self, **kwargs): """Nothing to delete""" return + def cog_unload(self): + # self.scheduler.remove_all_jobs() + self.scheduler.shutdown() + async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): message: discord.Message = ctx.message @@ -379,6 +386,13 @@ class FIFO(commands.Cog): trigger=await task.get_combined_trigger(), ) + async def _resume_job(self, task: Task): + try: + job = self.scheduler.resume_job(job_id=_assemble_job_id(task.name, task.guild_id)) + except JobLookupError: + job = await self._process_task(task) + return job + async def _pause_job(self, task: Task): return self.scheduler.pause_job(job_id=_assemble_job_id(task.name, task.guild_id)) @@ -404,6 +418,52 @@ class FIFO(commands.Cog): if ctx.invoked_subcommand is None: pass + @fifo.command(name="resume") + async def fifo_resume(self, ctx: commands.Context, task_name: Optional[str] = None): + if task_name is None: + if self.scheduler.state == STATE_PAUSED: + self.scheduler.resume() + await ctx.maybe_send_embed("All task execution for all guilds has been resumed") + else: + await ctx.maybe_send_embed("Task execution is not paused, can't resume") + else: + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + if await self._resume_job(task): + await ctx.maybe_send_embed(f"Execution of {task_name=} has been resumed") + else: + await ctx.maybe_send_embed(f"Failed to resume {task_name=}") + + @fifo.command(name="pause") + async def fifo_pause(self, ctx: commands.Context, task_name: Optional[str] = None): + if task_name is None: + if self.scheduler.state == STATE_RUNNING: + self.scheduler.pause() + await ctx.maybe_send_embed("All task execution for all guilds has been paused") + else: + await ctx.maybe_send_embed("Task execution is not running, can't pause") + else: + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + if await self._pause_job(task): + await ctx.maybe_send_embed(f"Execution of {task_name=} has been paused") + else: + await ctx.maybe_send_embed(f"Failed to pause {task_name=}") + @fifo.command(name="details") async def fifo_details(self, ctx: commands.Context, task_name: str): """ @@ -419,7 +479,7 @@ class FIFO(commands.Cog): ) return - embed = discord.Embed(title=task_name) + embed = discord.Embed(title=f"Task: {task_name}") embed.add_field( name="Task command", value=f"{ctx.prefix}{task.get_command_str()}", inline=False @@ -437,12 +497,16 @@ class FIFO(commands.Cog): embed.add_field(name="Channel", value=channel.mention) else: - embed.add_field(name="Server", value="Server not found") + embed.add_field(name="Server", value="Server not found", inline=False) trigger_str = "\n".join(str(t) for t in await task.get_triggers()) if trigger_str: embed.add_field(name="Triggers", value=trigger_str, inline=False) + job = await self._get_job(task) + if job and job.next_run_time: + embed.timestamp = job.next_run_time + await ctx.send(embed=embed) @fifo.command(name="list") From e1d314cc83bd09a4b19708d03f9cccfdda353491 Mon Sep 17 00:00:00 2001 From: bobloy Date: Wed, 2 Sep 2020 17:17:29 -0400 Subject: [PATCH 08/12] Start of Cron, fixed jobstore, add pause and resume, split task, --- fifo/__init__.py | 5 + ...nverter.py => datetime_cron_converters.py} | 11 + fifo/fifo.py | 415 ++++-------------- fifo/redconfigjobstore.py | 57 ++- fifo/task.py | 325 ++++++++++++++ 5 files changed, 475 insertions(+), 338 deletions(-) rename fifo/{datetimeconverter.py => datetime_cron_converters.py} (61%) create mode 100644 fifo/task.py diff --git a/fifo/__init__.py b/fifo/__init__.py index 860ab97..34cfd7b 100644 --- a/fifo/__init__.py +++ b/fifo/__init__.py @@ -4,3 +4,8 @@ from .fifo import FIFO async def setup(bot): cog = FIFO(bot) bot.add_cog(cog) + await cog.initialize() + + +def teardown(bot): + pass diff --git a/fifo/datetimeconverter.py b/fifo/datetime_cron_converters.py similarity index 61% rename from fifo/datetimeconverter.py rename to fifo/datetime_cron_converters.py index def0403..5382b07 100644 --- a/fifo/datetimeconverter.py +++ b/fifo/datetime_cron_converters.py @@ -1,6 +1,7 @@ from datetime import datetime from typing import TYPE_CHECKING +from apscheduler.triggers.cron import CronTrigger from discord.ext.commands import BadArgument, Converter from dateutil import parser @@ -8,6 +9,7 @@ from fifo.timezones import assemble_timezones if TYPE_CHECKING: DatetimeConverter = datetime + CronConverter = str else: class DatetimeConverter(Converter): async def convert(self, ctx, argument) -> datetime: @@ -15,3 +17,12 @@ else: if dt is not None: return dt raise BadArgument() + + class CronConverter(Converter): + async def convert(self, ctx, argument) -> str: + try: + CronTrigger.from_crontab(argument) + except ValueError: + raise BadArgument() + + return argument \ No newline at end of file diff --git a/fifo/fifo.py b/fifo/fifo.py index e210f90..10b72af 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,31 +1,23 @@ import logging from datetime import datetime, timedelta -from typing import Dict, List, Optional, Union +from typing import Optional, Union import discord from apscheduler.job import Job from apscheduler.jobstores.base import JobLookupError from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING -from apscheduler.triggers.base import BaseTrigger -from apscheduler.triggers.combining import OrTrigger -from apscheduler.triggers.date import DateTrigger -from apscheduler.triggers.interval import IntervalTrigger -from discord.utils import time_snowflake from redbot.core import Config, checks, commands from redbot.core.bot import Red from redbot.core.commands import DictConverter, TimedeltaConverter -from .datetimeconverter import DatetimeConverter +from .datetime_cron_converters import CronConverter, DatetimeConverter +from .task import Task -log = logging.getLogger("red.fox_v3.fifo") schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler") schedule_log.setLevel(logging.DEBUG) -log.setLevel(logging.DEBUG) - -async def _do_nothing(*args, **kwargs): - pass +log = logging.getLogger("red.fox_v3.fifo") async def _execute_task(task_state): @@ -36,279 +28,6 @@ async def _execute_task(task_state): return False -def get_trigger(data): - if data["type"] == "interval": - parsed_time = data["time_data"] - return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds) - - if data["type"] == "date": - return DateTrigger(data["time_data"]) - - if data["type"] == "cron": - return None # TODO: Cron parsing - - return False - - -def parse_triggers(data: Union[Dict, None]): - if data is None or not data.get("triggers", False): # No triggers - return None - - if len(data["triggers"]) > 1: # Multiple triggers - return OrTrigger(get_trigger(t_data) for t_data in data["triggers"]) - - return get_trigger(data["triggers"][0]) - - -class FakeMessage2(discord.Message): - __slots__ = ("__dict__",) - - -class FakeMessage: - def __init__(self, message: discord.Message): - d = {k: getattr(message, k, None) for k in dir(message)} - self.__dict__.update(**d) - - -class Task: - default_task_data = {"triggers": [], "command_str": ""} - - default_trigger = { - "type": "", - "time_data": None, # Used for Interval and Date Triggers - } - - def __init__( - self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None - ): - self.name = name - self.guild_id = guild_id - self.config = config - self.bot = bot - self.author_id = author_id - self.channel_id = channel_id - self.data = None - - async def _encode_time_triggers(self): - if not self.data or not self.data.get("triggers", None): - return [] - - triggers = [] - for t in self.data["triggers"]: - if t["type"] == "interval": # Convert into timedelta - td: timedelta = t["time_data"] - - triggers.append( - {"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}} - ) - continue - - if t["type"] == "date": # Convert into datetime - dt: datetime = t["time_data"] - triggers.append({"type": t["type"], "time_data": dt.isoformat()}) - # triggers.append( - # { - # "type": t["type"], - # "time_data": { - # "year": dt.year, - # "month": dt.month, - # "day": dt.day, - # "hour": dt.hour, - # "minute": dt.minute, - # "second": dt.second, - # "tzinfo": dt.tzinfo, - # }, - # } - # ) - continue - - if t["type"] == "cron": - raise NotImplemented - raise NotImplemented - - return triggers - - async def _decode_time_triggers(self): - if not self.data or not self.data.get("triggers", None): - return - - for n, t in enumerate(self.data["triggers"]): - if t["type"] == "interval": # Convert into timedelta - self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"]) - continue - - if t["type"] == "date": # Convert into datetime - # self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) - self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"]) - continue - - if t["type"] == "cron": - raise NotImplemented - raise NotImplemented - - # async def load_from_data(self, data: Dict): - # self.data = data.copy() - - async def load_from_config(self): - data = await self.config.guild_from_id(self.guild_id).tasks.get_raw( - self.name, default=None - ) - - if not data: - return - - self.author_id = data["author_id"] - self.guild_id = data["guild_id"] - self.channel_id = data["channel_id"] - - self.data = data["data"] - - await self._decode_time_triggers() - return self.data - - async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]: - if not self.data: - await self.load_from_config() - - if self.data is None or "triggers" not in self.data: # No triggers - return [] - - return [get_trigger(t) for t in self.data["triggers"]] - - async def get_combined_trigger(self) -> Union[BaseTrigger, None]: - if not self.data: - await self.load_from_config() - - return parse_triggers(self.data) - - # async def set_job_id(self, job_id): - # if self.data is None: - # await self.load_from_config() - # - # self.data["job_id"] = job_id - - async def save_all(self): - """To be used when creating an new task""" - - data_to_save = self.default_task_data.copy() - if self.data: - data_to_save["command_str"] = self.get_command_str() - data_to_save["triggers"] = await self._encode_time_triggers() - - to_save = { - "guild_id": self.guild_id, - "author_id": self.author_id, - "channel_id": self.channel_id, - "data": data_to_save, - } - await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save) - - async def save_data(self): - """To be used when updating triggers""" - if not self.data: - return - - data_to_save = self.data.copy() - data_to_save["triggers"] = await self._encode_time_triggers() - - await self.config.guild_from_id(self.guild_id).tasks.set_raw( - self.name, "data", value=data_to_save - ) - - async def execute(self): - if not self.data or not self.get_command_str(): - log.warning(f"Could not execute task due to data problem: {self.data=}") - return False - - guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix - if guild is None: - log.warning(f"Could not execute task due to missing guild: {self.guild_id}") - return False - channel: discord.TextChannel = guild.get_channel(self.channel_id) - if channel is None: - log.warning(f"Could not execute task due to missing channel: {self.channel_id}") - return False - author: discord.User = guild.get_member(self.author_id) - if author is None: - log.warning(f"Could not execute task due to missing author: {self.author_id}") - return False - - actual_message: discord.Message = channel.last_message - if actual_message is None: - log.warning("No message found in channel cache yet, skipping execution") - return - - message = FakeMessage(actual_message) - # message = FakeMessage2 - message.author = author - message.id = time_snowflake(datetime.now()) # Pretend to be now - message.add_reaction = _do_nothing - - prefixes = await self.bot.get_prefix(message) - if isinstance(prefixes, str): - prefix = prefixes - else: - prefix = prefixes[0] - - message.content = f"{prefix}{self.get_command_str()}" - - if not message.guild or not message.author or not message.content: - log.warning(f"Could not execute task due to message problem: {message}") - return False - - new_ctx: commands.Context = await self.bot.get_context(message) - new_ctx.assume_yes = True - if not new_ctx.valid: - log.warning(f"Could not execute task due invalid context: {new_ctx}") - return False - - await self.bot.invoke(new_ctx) - return True - - async def set_bot(self, bot: Red): - self.bot = bot - - async def set_author(self, author: Union[discord.User, str]): - self.author_id = getattr(author, "id", None) or author - - def get_command_str(self): - return self.data.get("command_str", "") - - async def set_commmand_str(self, command_str): - if not self.data: - self.data = self.default_task_data.copy() - self.data["command_str"] = command_str - return True - - async def add_trigger(self, param, parsed_time: Union[timedelta, datetime]): - trigger_data = {"type": param, "time_data": parsed_time} - if not get_trigger(trigger_data): - return False - - if not self.data: - self.data = self.default_task_data.copy() - - self.data["triggers"].append(trigger_data) - return True - - def __setstate__(self, task_state): - self.name = task_state["name"] - self.guild_id = task_state["guild_id"] - self.config = task_state["config"] - self.bot = None - self.author_id = None - self.channel_id = None - self.data = None - - def __getstate__(self): - return { - "name": self.name, - "guild_id": self.guild_id, - "config": self.config, - "bot": self.bot, - } - - def _assemble_job_id(task_name, guild_id): return f"{task_name}_{guild_id}" @@ -335,28 +54,34 @@ class FIFO(commands.Cog): self.config.register_global(**default_global) self.config.register_guild(**default_guild) - from .redconfigjobstore import RedConfigJobStore + self.scheduler = None + self.jobstore = None - jobstores = {"default": RedConfigJobStore(self.config, self.bot)} + async def red_delete_data_for_user(self, **kwargs): + """Nothing to delete""" + return + + def cog_unload(self): + # self.scheduler.remove_all_jobs() + if self.scheduler is not None: + self.scheduler.shutdown() + + async def initialize(self): job_defaults = {"coalesce": False, "max_instances": 1} # executors = {"default": AsyncIOExecutor()} # Default executor is already AsyncIOExecutor - self.scheduler = AsyncIOScheduler( - jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log - ) + self.scheduler = AsyncIOScheduler(job_defaults=job_defaults, logger=schedule_log) - self.scheduler.start() # TODO: Jobs are not receiving next_run_times + from .redconfigjobstore import RedConfigJobStore - async def red_delete_data_for_user(self, **kwargs): - """Nothing to delete""" - return + self.jobstore = RedConfigJobStore(self.config, self.bot) + await self.jobstore.load_from_config(self.scheduler, "default") + self.scheduler.add_jobstore(self.jobstore, "default") - def cog_unload(self): - # self.scheduler.remove_all_jobs() - self.scheduler.shutdown() + self.scheduler.start() # TODO: Jobs are not receiving next_run_times async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): message: discord.Message = ctx.message @@ -400,6 +125,7 @@ class FIFO(commands.Cog): return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id)) @checks.is_owner() + @commands.guild_only() @commands.command() async def fifoclear(self, ctx: commands.Context): """Debug command to clear all current fifo data""" @@ -410,6 +136,7 @@ class FIFO(commands.Cog): await ctx.tick() @checks.is_owner() # Will be reduced when I figure out permissions later + @commands.guild_only() @commands.group() async def fifo(self, ctx: commands.Context): """ @@ -418,8 +145,46 @@ class FIFO(commands.Cog): if ctx.invoked_subcommand is None: pass + @fifo.command(name="set") + async def fifo_setauthor(self, ctx: commands.Context, task_name: str, author_or_channel: Union[discord.Member, discord.TextChannel]): + """ + Sets the task to be executed as a different author or in a different channel. + """ + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + if isinstance(author_or_channel, discord.Member): + if task.author_id == author_or_channel.id: + await ctx.maybe_send_embed("Already executing as that member") + return + + await task.set_author(author_or_channel) # also saves + elif isinstance(author_or_channel, discord.TextChannel): + if task.channel_id == author_or_channel.id: + await ctx.maybe_send_embed("Already executing in that channel") + return + + await task.set_channel(author_or_channel) + else: + await ctx.maybe_send_embed("Unsupported result") + return + + await ctx.tick() + @fifo.command(name="resume") async def fifo_resume(self, ctx: commands.Context, task_name: Optional[str] = None): + """ + Provide a task name to resume execution of a task. + + Otherwise resumes execution of all tasks on all guilds + If the task isn't currently scheduled, will schedule it + """ if task_name is None: if self.scheduler.state == STATE_PAUSED: self.scheduler.resume() @@ -443,6 +208,11 @@ class FIFO(commands.Cog): @fifo.command(name="pause") async def fifo_pause(self, ctx: commands.Context, task_name: Optional[str] = None): + """ + Provide a task name to pause execution of a task + + Otherwise pauses execution of all tasks on all guilds + """ if task_name is None: if self.scheduler.state == STATE_RUNNING: self.scheduler.pause() @@ -468,7 +238,6 @@ class FIFO(commands.Cog): async def fifo_details(self, ctx: commands.Context, task_name: str): """ Provide all the details on the specified task name - """ task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) await task.load_from_config() @@ -633,33 +402,31 @@ class FIFO(commands.Cog): @fifo_trigger.command(name="cron") async def fifo_trigger_cron( - self, ctx: commands.Context, task_name: str, cron_settings: DictConverter + self, ctx: commands.Context, task_name: str, *, cron_str: CronConverter ): """ Add a "time of day" trigger to the specified task """ - await ctx.maybe_send_embed("Not yet implemented") - - # async def load_tasks(self): - # """ - # Run once on cog load. - # """ - # all_guilds = await self.config.all_guilds() - # async for guild_id, guild_data in AsyncIter(all_guilds["tasks"].items(), steps=100): - # for task_name, task_data in guild_data["tasks"].items(): - # task = Task(task_name, guild_id, self.config) - # await task.load_from_data(task_data) - # - # job = self.scheduler.add_job( - # task.execute, id=task_name + "_" + guild_id, trigger=await task.get_combined_trigger(), - # ) - # - # self.scheduler.start() - - # async def parent_loop(self): - # await asyncio.sleep(60) - # asyncio.create_task(self.process_tasks(datetime.datetime.utcnow())) - # - # async def process_tasks(self, now: datetime.datetime): - # for task in self.tasks: - # pass + task = Task(task_name, ctx.guild.id, self.config) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + result = await task.add_trigger("cron", cron_str) + if not result: + await ctx.maybe_send_embed( + "Failed to add a cron trigger to this task, see console for logs" + ) + return + + await task.save_data() + job: Job = await self._process_task(task) + delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo) + await ctx.maybe_send_embed( + f"Task `{task_name}` added cron_str to its scheduled runtimes\n" + f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)" + ) diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index cbbe0ab..3747a97 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -3,16 +3,19 @@ import base64 import logging import pickle from datetime import datetime +from time import sleep from typing import Tuple, Union from apscheduler.job import Job from apscheduler.jobstores.base import ConflictingIdError, JobLookupError from apscheduler.jobstores.memory import MemoryJobStore +from apscheduler.schedulers.asyncio import run_in_event_loop from apscheduler.util import datetime_to_utc_timestamp from redbot.core import Config # TODO: use get_lock on config from redbot.core.bot import Red +from redbot.core.utils import AsyncIter log = logging.getLogger("red.fox_v3.fifo.jobstore") log.setLevel(logging.DEBUG) @@ -26,19 +29,29 @@ class RedConfigJobStore(MemoryJobStore): self.config = config self.bot = bot self.pickle_protocol = pickle.HIGHEST_PROTOCOL - asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop) + self._eventloop = self.bot.loop + # TODO: self.config.jobs_index is never read from, + # either remove or replace self._jobs_index - async def _load_from_config(self): - self._jobs = await self.config.jobs() + # task = asyncio.create_task(self.load_from_config()) + # while not task.done(): + # sleep(0.1) + # future = asyncio.ensure_future(self.load_from_config(), loop=self.bot.loop) + + @run_in_event_loop + def start(self, scheduler, alias): + super().start(scheduler, alias) + + async def load_from_config(self, scheduler, alias): + super().start(scheduler, alias) + _jobs = await self.config.jobs() self._jobs = [ - (await self._decode_job(job["job_state"]), timestamp) - for (job, timestamp) in self._jobs + (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs) ] - self._jobs_index = await self.config.jobs_index.all() + # self._jobs_index = await self.config.jobs_index.all() # Overwritten by next self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} def _encode_job(self, job: Job): - # log.debug(f"Encoding job id: {job.id}") job_state = job.__getstate__() new_args = list(job_state["args"]) new_args[0]["config"] = None @@ -54,9 +67,15 @@ class RedConfigJobStore(MemoryJobStore): new_args[0]["config"] = self.config new_args[0]["bot"] = self.bot job_state["args"] = tuple(new_args) + # log.debug(f"Encoding job id: {job.id}\n" + # f"Encoded as: {out}") + return out - async def _decode_job(self, job_state): + async def _decode_job(self, in_job): + if in_job is None: + return None + job_state = in_job["job_state"] job_state = pickle.loads(base64.b64decode(job_state)) new_args = list(job_state["args"]) new_args[0]["config"] = self.config @@ -73,10 +92,12 @@ class RedConfigJobStore(MemoryJobStore): # # job.func = task.execute - # log.debug(f"Decoded job id: {job.id}") + # log.debug(f"Decoded job id: {job.id}\n" + # f"Decoded as {job_state}") return job + @run_in_event_loop def add_job(self, job: Job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) @@ -89,11 +110,14 @@ class RedConfigJobStore(MemoryJobStore): # log.debug(f"Added job: {self._jobs[index][0].args}") async def _async_add_job(self, job, index, timestamp): + encoded_job = self._encode_job(job) + job_tuple = tuple([encoded_job, timestamp]) async with self.config.jobs() as jobs: - jobs.insert(index, (self._encode_job(job), timestamp)) - await self.config.jobs_index.set_raw(job.id, value=(self._encode_job(job), timestamp)) + jobs.insert(index, job_tuple) + await self.config.jobs_index.set_raw(job.id, value=job_tuple) return True + @run_in_event_loop def update_job(self, job): old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( job.id, (None, None) @@ -107,9 +131,8 @@ class RedConfigJobStore(MemoryJobStore): # Otherwise, reinsert the job to the list to preserve the ordering. old_index = self._get_job_index(old_timestamp, old_job.id) new_timestamp = datetime_to_utc_timestamp(job.next_run_time) - asyncio.ensure_future( - self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), - loop=self.bot.loop, + asyncio.create_task( + self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp) ) async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): @@ -131,6 +154,7 @@ class RedConfigJobStore(MemoryJobStore): log.debug(f"Async Updated {job.id=}") log.debug(f"Check job args: {job.args=}") + @run_in_event_loop def remove_job(self, job_id): job, timestamp = self._jobs_index.get(job_id, (None, None)) if job is None: @@ -146,6 +170,7 @@ class RedConfigJobStore(MemoryJobStore): del jobs[index] await self.config.jobs_index.clear_raw(job.id) + @run_in_event_loop def remove_all_jobs(self): super().remove_all_jobs() asyncio.create_task(self._async_remove_all_jobs()) @@ -154,6 +179,10 @@ class RedConfigJobStore(MemoryJobStore): await self.config.jobs.clear() await self.config.jobs_index.clear() + def shutdown(self): + """Removes all jobs without clearing config""" + super().remove_all_jobs() + # import asyncio # diff --git a/fifo/task.py b/fifo/task.py new file mode 100644 index 0000000..62d50ca --- /dev/null +++ b/fifo/task.py @@ -0,0 +1,325 @@ +import logging +from datetime import datetime, timedelta +from typing import Dict, List, Union + +import discord +from apscheduler.triggers.base import BaseTrigger +from apscheduler.triggers.combining import OrTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.cron import CronTrigger +from discord.utils import time_snowflake +from redbot.core import Config, commands +from redbot.core.bot import Red + +log = logging.getLogger("red.fox_v3.fifo.task") + + +async def _do_nothing(*args, **kwargs): + pass + + +def get_trigger(data): + if data["type"] == "interval": + parsed_time = data["time_data"] + return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds) + + if data["type"] == "date": + return DateTrigger(data["time_data"]) + + if data["type"] == "cron": + return CronTrigger.from_crontab(data["time_data"]) + + return False + + +def parse_triggers(data: Union[Dict, None]): + if data is None or not data.get("triggers", False): # No triggers + return None + + if len(data["triggers"]) > 1: # Multiple triggers + return OrTrigger(get_trigger(t_data) for t_data in data["triggers"]) + + return get_trigger(data["triggers"][0]) + + +class FakeMessage: + def __init__(self, message: discord.Message): + d = {k: getattr(message, k, None) for k in dir(message)} + self.__dict__.update(**d) + + +def neuter_message(message: FakeMessage): + message.delete = _do_nothing + message.edit = _do_nothing + message.publish = _do_nothing + message.pin = _do_nothing + message.unpin = _do_nothing + message.add_reaction = _do_nothing + message.remove_reaction = _do_nothing + message.clear_reaction = _do_nothing + message.clear_reactions = _do_nothing + message.ack = _do_nothing + + return message + + +class Task: + default_task_data = {"triggers": [], "command_str": ""} + + default_trigger = { + "type": "", + "time_data": None, # Used for Interval and Date Triggers + } + + def __init__( + self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None + ): + self.name = name + self.guild_id = guild_id + self.config = config + self.bot = bot + self.author_id = author_id + self.channel_id = channel_id + self.data = None + + async def _encode_time_triggers(self): + if not self.data or not self.data.get("triggers", None): + return [] + + triggers = [] + for t in self.data["triggers"]: + if t["type"] == "interval": # Convert into timedelta + td: timedelta = t["time_data"] + + triggers.append( + {"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}} + ) + continue + + if t["type"] == "date": # Convert into datetime + dt: datetime = t["time_data"] + triggers.append({"type": t["type"], "time_data": dt.isoformat()}) + # triggers.append( + # { + # "type": t["type"], + # "time_data": { + # "year": dt.year, + # "month": dt.month, + # "day": dt.day, + # "hour": dt.hour, + # "minute": dt.minute, + # "second": dt.second, + # "tzinfo": dt.tzinfo, + # }, + # } + # ) + continue + + if t["type"] == "cron": # TODO: Implement this, should be easy + raise NotImplemented + raise NotImplemented + + return triggers + + async def _decode_time_triggers(self): + if not self.data or not self.data.get("triggers", None): + return + + for n, t in enumerate(self.data["triggers"]): + if t["type"] == "interval": # Convert into timedelta + self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"]) + continue + + if t["type"] == "date": # Convert into datetime + # self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) + self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"]) + continue + + if t["type"] == "cron": + raise NotImplemented + raise NotImplemented + + # async def load_from_data(self, data: Dict): + # self.data = data.copy() + + async def load_from_config(self): + data = await self.config.guild_from_id(self.guild_id).tasks.get_raw( + self.name, default=None + ) + + if not data: + return + + self.author_id = data["author_id"] + self.guild_id = data["guild_id"] + self.channel_id = data["channel_id"] + + self.data = data["data"] + + await self._decode_time_triggers() + return self.data + + async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]: + if not self.data: + await self.load_from_config() + + if self.data is None or "triggers" not in self.data: # No triggers + return [] + + return [get_trigger(t) for t in self.data["triggers"]] + + async def get_combined_trigger(self) -> Union[BaseTrigger, None]: + if not self.data: + await self.load_from_config() + + return parse_triggers(self.data) + + # async def set_job_id(self, job_id): + # if self.data is None: + # await self.load_from_config() + # + # self.data["job_id"] = job_id + + async def save_all(self): + """To be used when creating an new task""" + + data_to_save = self.default_task_data.copy() + if self.data: + data_to_save["command_str"] = self.get_command_str() + data_to_save["triggers"] = await self._encode_time_triggers() + + to_save = { + "guild_id": self.guild_id, + "author_id": self.author_id, + "channel_id": self.channel_id, + "data": data_to_save, + } + await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save) + + async def save_data(self): + """To be used when updating triggers""" + if not self.data: + return + + data_to_save = self.data.copy() + data_to_save["triggers"] = await self._encode_time_triggers() + + await self.config.guild_from_id(self.guild_id).tasks.set_raw( + self.name, "data", value=data_to_save + ) + + async def execute(self): + if not self.data or not self.get_command_str(): + log.warning(f"Could not execute task due to data problem: {self.data=}") + return False + + guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix + if guild is None: + log.warning(f"Could not execute task due to missing guild: {self.guild_id}") + return False + channel: discord.TextChannel = guild.get_channel(self.channel_id) + if channel is None: + log.warning(f"Could not execute task due to missing channel: {self.channel_id}") + return False + author: discord.User = guild.get_member(self.author_id) + if author is None: + log.warning(f"Could not execute task due to missing author: {self.author_id}") + return False + + actual_message: discord.Message = channel.last_message + # I'd like to present you my chain of increasingly desperate message fetching attempts + if actual_message is None: + # log.warning("No message found in channel cache yet, skipping execution") + # return + actual_message = await channel.fetch_message(channel.last_message_id) + if actual_message is None: # last_message_id was an invalid message I guess + actual_message = await channel.history(limit=1).flatten() + if not actual_message: # Basically only happens if the channel has no messages + actual_message = await author.history(limit=1).flatten() + if not actual_message: # Okay, the *author* has never sent a message? + log.warning("No message found in channel cache yet, skipping execution") + return + actual_message = actual_message[0] + + message = FakeMessage(actual_message) + # message = FakeMessage2 + message.author = author + message.guild = guild # Just in case we got desperate + message.channel = channel + message.id = time_snowflake(datetime.now()) # Pretend to be now + message = neuter_message(message) + + # absolutely weird that this takes a message object instead of guild + prefixes = await self.bot.get_prefix(message) + if isinstance(prefixes, str): + prefix = prefixes + else: + prefix = prefixes[0] + + message.content = f"{prefix}{self.get_command_str()}" + + if not message.guild or not message.author or not message.content: + log.warning(f"Could not execute task due to message problem: {message}") + return False + + new_ctx: commands.Context = await self.bot.get_context(message) + new_ctx.assume_yes = True + if not new_ctx.valid: + log.warning(f"Could not execute task due invalid context: {new_ctx}") + return False + + await self.bot.invoke(new_ctx) + return True + + async def set_bot(self, bot: Red): + self.bot = bot + + async def set_author(self, author: Union[discord.User, discord.Member, str]): + self.author_id = getattr(author, "id", None) or author + await self.config.guild_from_id(self.guild_id).tasks.set_raw( + self.name, "author_id", value=self.author_id + ) + + async def set_channel(self, channel: Union[discord.TextChannel, str]): + self.channel_id = getattr(channel, "id", None) or channel + await self.config.guild_from_id(self.guild_id).tasks.set_raw( + self.name, "channel_id", value=self.author_id + ) + + def get_command_str(self): + return self.data.get("command_str", "") + + async def set_commmand_str(self, command_str): + if not self.data: + self.data = self.default_task_data.copy() + self.data["command_str"] = command_str + return True + + async def add_trigger(self, param, parsed_time: Union[timedelta, datetime, str]): + trigger_data = {"type": param, "time_data": parsed_time} + if not get_trigger(trigger_data): + return False + + if not self.data: + self.data = self.default_task_data.copy() + + self.data["triggers"].append(trigger_data) + return True + + def __setstate__(self, task_state): + self.name = task_state["name"] + self.guild_id = task_state["guild_id"] + self.config = task_state["config"] + self.bot = None + self.author_id = None + self.channel_id = None + self.data = None + + def __getstate__(self): + return { + "name": self.name, + "guild_id": self.guild_id, + "config": self.config, + "bot": self.bot, + } From 607b7b67183bc12667d6f4db9b4e2b9ad298f49d Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 3 Sep 2020 10:04:10 -0400 Subject: [PATCH 09/12] Ready for release? --- fifo/fifo.py | 54 +++++++++++++++++++++++++++++++++++++++++++++----- fifo/redjob.py | 44 ---------------------------------------- fifo/task.py | 20 +++++++++++++++---- 3 files changed, 65 insertions(+), 53 deletions(-) delete mode 100644 fifo/redjob.py diff --git a/fifo/fifo.py b/fifo/fifo.py index 10b72af..d4156ea 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -93,6 +93,13 @@ class FIFO(commands.Cog): return new_ctx.valid + async def _delete_task(self, task: Task): + job: Union[Job, None] = await self._get_job(task) + if job is not None: + job.remove() + + await task.delete_self() + async def _process_task(self, task: Task): job: Union[Job, None] = await self._get_job(task) if job is not None: @@ -146,9 +153,14 @@ class FIFO(commands.Cog): pass @fifo.command(name="set") - async def fifo_setauthor(self, ctx: commands.Context, task_name: str, author_or_channel: Union[discord.Member, discord.TextChannel]): + async def fifo_set( + self, + ctx: commands.Context, + task_name: str, + author_or_channel: Union[discord.Member, discord.TextChannel], + ): """ - Sets the task to be executed as a different author or in a different channel. + Sets a different author or in a different channel for execution of a task. """ task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) await task.load_from_config() @@ -327,9 +339,39 @@ class FIFO(commands.Cog): """ Deletes a task from this guild's task list """ - pass + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() + + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + await self._delete_task(task) + await ctx.maybe_send_embed(f"Task[{task_name}] has been deleted from this guild") + + @fifo.command(name="cleartriggers", aliases=["cleartrigger"]) + async def fifo_cleartriggers(self, ctx: commands.Context, task_name: str): + """ + Removes all triggers from specified task + + Useful to start over with new trigger + """ + + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) + await task.load_from_config() - @fifo.group(name="trigger") + if task.data is None: + await ctx.maybe_send_embed( + f"Task by the name of {task_name} is not found in this guild" + ) + return + + await task.clear_triggers() + await ctx.tick() + + @fifo.group(name="addtrigger", aliases=["trigger"]) async def fifo_trigger(self, ctx: commands.Context): """ Add a new trigger for a task from the current guild. @@ -405,7 +447,9 @@ class FIFO(commands.Cog): self, ctx: commands.Context, task_name: str, *, cron_str: CronConverter ): """ - Add a "time of day" trigger to the specified task + Add a cron "time of day" trigger to the specified task + + See https://crontab.guru/ for help generating the cron_str """ task = Task(task_name, ctx.guild.id, self.config) await task.load_from_config() diff --git a/fifo/redjob.py b/fifo/redjob.py deleted file mode 100644 index c276aa4..0000000 --- a/fifo/redjob.py +++ /dev/null @@ -1,44 +0,0 @@ -import six -from apscheduler.job import Job -from apscheduler.schedulers.asyncio import AsyncIOScheduler -from apscheduler.schedulers.base import STATE_STOPPED -from apscheduler.util import undefined - - -class RedJob(Job): - pass - - -class RedAsyncIOScheduler(AsyncIOScheduler): - - def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, - misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, - next_run_time=undefined, jobstore='default', executor='default', - replace_existing=False, **trigger_args): - job_kwargs = { - 'trigger': self._create_trigger(trigger, trigger_args), - 'executor': executor, - 'func': func, - 'args': tuple(args) if args is not None else (), - 'kwargs': dict(kwargs) if kwargs is not None else {}, - 'id': id, - 'name': name, - 'misfire_grace_time': misfire_grace_time, - 'coalesce': coalesce, - 'max_instances': max_instances, - 'next_run_time': next_run_time - } - job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if - value is not undefined) - job = RedJob(self, **job_kwargs) - - # Don't really add jobs to job stores before the scheduler is up and running - with self._jobstores_lock: - if self.state == STATE_STOPPED: - self._pending_jobs.append((job, jobstore, replace_existing)) - self._logger.info('Adding job tentatively -- it will be properly scheduled when ' - 'the scheduler starts') - else: - self._real_add_job(job, jobstore, replace_existing) - - return job diff --git a/fifo/task.py b/fifo/task.py index 62d50ca..9ed4e12 100644 --- a/fifo/task.py +++ b/fifo/task.py @@ -117,7 +117,9 @@ class Task: continue if t["type"] == "cron": # TODO: Implement this, should be easy - raise NotImplemented + triggers.append(t) # already a string, nothing to do + + continue raise NotImplemented return triggers @@ -137,7 +139,7 @@ class Task: continue if t["type"] == "cron": - raise NotImplemented + continue # already a string raise NotImplemented # async def load_from_data(self, data: Dict): @@ -266,7 +268,9 @@ class Task: new_ctx: commands.Context = await self.bot.get_context(message) new_ctx.assume_yes = True if not new_ctx.valid: - log.warning(f"Could not execute task due invalid context: {new_ctx}") + log.warning( + f"Could not execute Task[{self.name}] due invalid context: {new_ctx.invoked_with}" + ) return False await self.bot.invoke(new_ctx) @@ -284,7 +288,7 @@ class Task: async def set_channel(self, channel: Union[discord.TextChannel, str]): self.channel_id = getattr(channel, "id", None) or channel await self.config.guild_from_id(self.guild_id).tasks.set_raw( - self.name, "channel_id", value=self.author_id + self.name, "channel_id", value=self.channel_id ) def get_command_str(self): @@ -323,3 +327,11 @@ class Task: "config": self.config, "bot": self.bot, } + + async def clear_triggers(self): + self.data["triggers"] = [] + await self.save_data() + + async def delete_self(self): + """Hopefully nothing uses the object after running this...""" + await self.config.guild_from_id(self.guild_id).tasks.clear_raw(self.name) From 4f494d115d8607981f0cd71d4e662d697aba0d7b Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 3 Sep 2020 10:17:05 -0400 Subject: [PATCH 10/12] Fifo release ready --- README.md | 1 + fifo/fifo.py | 2 +- fifo/info.json | 17 ++++-- fifo/redconfigjobstore.py | 112 ++------------------------------------ fifo/task.py | 2 +- 5 files changed, 21 insertions(+), 113 deletions(-) diff --git a/README.md b/README.md index d8399cc..3a9ab8f 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ Cog Function | conquest | **Alpha** |
Manage maps for war games and RPGsLots of additional features are planned, currently function with simple map
| | dad | **Beta** |
Tell dad jokesWorks great!
| | exclusiverole | **Alpha** |
Prevent certain roles from getting any other rolesFully functional, but pretty simple
| +| fifo | **Alpha** |
Schedule commands to be run at certain times or intervalsJust released, please report bugs as you find them. Only works for bot owner for now
| | fight | **Incomplete** |
Organize bracket tournaments within discordStill in-progress, a massive project
| | flag | **Alpha** |
Create temporary marks on users that expire after specified timePorted, will not import old data. Please report bugs
| | forcemention | **Alpha** |
Mentions unmentionable rolesVery simple cog, mention doesn't persist
| diff --git a/fifo/fifo.py b/fifo/fifo.py index d4156ea..9e0580e 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -81,7 +81,7 @@ class FIFO(commands.Cog): await self.jobstore.load_from_config(self.scheduler, "default") self.scheduler.add_jobstore(self.jobstore, "default") - self.scheduler.start() # TODO: Jobs are not receiving next_run_times + self.scheduler.start() async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): message: discord.Message = ctx.message diff --git a/fifo/info.json b/fifo/info.json index 6cc6f9d..c8c2ed9 100644 --- a/fifo/info.json +++ b/fifo/info.json @@ -3,19 +3,26 @@ "Bobloy" ], "min_bot_version": "3.3.0", - "description": "Schedule commands to be run by certain at certain times or intervals", + "description": "[ALPHA] Schedule commands to be run at certain times or intervals", "hidden": false, "install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`", - "short": "Schedule commands to be run by certain at certain times or intervals\"", + "short": "[ALPHA] Schedule commands to be run at certain times or intervals\"", "end_user_data_statement": "This cog does not store any End User Data", "requirements": [ - "apscheduler" + "apscheduler", + "dateutil" ], "tags": [ "bobloy", "utilities", - "tools", "tool", - "roles" + "roles", + "schedule", + "cron", + "interval", + "date", + "datetime", + "time", + "calendar" ] } \ No newline at end of file diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 3747a97..aa6d967 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -30,8 +30,8 @@ class RedConfigJobStore(MemoryJobStore): self.bot = bot self.pickle_protocol = pickle.HIGHEST_PROTOCOL self._eventloop = self.bot.loop - # TODO: self.config.jobs_index is never read from, - # either remove or replace self._jobs_index + # TODO: self.config.jobs_index is never used, + # fine but maybe a sign of inefficient use of config # task = asyncio.create_task(self.load_from_config()) # while not task.done(): @@ -114,7 +114,7 @@ class RedConfigJobStore(MemoryJobStore): job_tuple = tuple([encoded_job, timestamp]) async with self.config.jobs() as jobs: jobs.insert(index, job_tuple) - await self.config.jobs_index.set_raw(job.id, value=job_tuple) + # await self.config.jobs_index.set_raw(job.id, value=job_tuple) return True @run_in_event_loop @@ -149,7 +149,7 @@ class RedConfigJobStore(MemoryJobStore): del jobs[old_index] jobs.insert(new_index, (encoded_job, new_timestamp)) self._jobs_index[old_job.id] = (job, new_timestamp) - await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) + # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) log.debug(f"Async Updated {job.id=}") log.debug(f"Check job args: {job.args=}") @@ -168,7 +168,7 @@ class RedConfigJobStore(MemoryJobStore): async def _async_remove_job(self, index, job): async with self.config.jobs() as jobs: del jobs[index] - await self.config.jobs_index.clear_raw(job.id) + # await self.config.jobs_index.clear_raw(job.id) @run_in_event_loop def remove_all_jobs(self): @@ -177,108 +177,8 @@ class RedConfigJobStore(MemoryJobStore): async def _async_remove_all_jobs(self): await self.config.jobs.clear() - await self.config.jobs_index.clear() + # await self.config.jobs_index.clear() def shutdown(self): """Removes all jobs without clearing config""" super().remove_all_jobs() - - -# import asyncio -# -# from apscheduler.jobstores.base import BaseJobStore, ConflictingIdError -# from apscheduler.util import datetime_to_utc_timestamp -# from redbot.core import Config -# from redbot.core.utils import AsyncIter -# -# -# class RedConfigJobStore(BaseJobStore): -# def __init__(self, config: Config, loop): -# super().__init__() -# self.config = config -# self.loop: asyncio.BaseEventLoop = loop -# -# self._jobs = [] -# self._jobs_index = {} # id -> (job, timestamp) lookup table -# -# def lookup_job(self, job_id): -# return asyncio.run(self._async_lookup_job(job_id)) -# -# async def _async_lookup_job(self, job_id): -# return (await self.config.jobs_index.get_raw(job_id, default=(None, None)))[0] -# -# def get_due_jobs(self, now): -# return asyncio.run(self._async_get_due_jobs(now)) -# -# async def _async_get_due_jobs(self, now): -# now_timestamp = datetime_to_utc_timestamp(now) -# pending = [] -# all_jobs = await self.config.jobs() -# async for job, timestamp in AsyncIter(all_jobs, steps=100): -# if timestamp is None or timestamp > now_timestamp: -# break -# pending.append(job) -# -# return pending -# -# def get_next_run_time(self): -# return asyncio.run(self._async_get_next_run_time()) -# -# async def _async_get_next_run_time(self): -# _jobs = await self.config.jobs() -# return _jobs[0][0].next_run_time if _jobs else None -# -# def get_all_jobs(self): -# return asyncio.run(self._async_get_all_jobs()) -# -# async def _async_get_all_jobs(self): -# return [j[0] for j in (await self.config.jobs())] -# -# def add_job(self, job): -# return asyncio.run(self._async_add_job(job)) -# -# async def _async_add_job(self, job): -# if await self.config.jobs_index.get_raw(job.id, default=None) is not None: -# raise ConflictingIdError(job.id) -# -# timestamp = datetime_to_utc_timestamp(job.next_run_time) -# index = self._get_job_index(timestamp, job.id) -# self._jobs.insert(index, (job, timestamp)) -# self._jobs_index[job.id] = (job, timestamp) -# -# def update_job(self, job): -# pass -# -# def remove_job(self, job_id): -# pass -# -# def remove_all_jobs(self): -# pass -# -# def _get_job_index(self, timestamp, job_id): -# """ -# Returns the index of the given job, or if it's not found, the index where the job should be -# inserted based on the given timestamp. -# -# :type timestamp: int -# :type job_id: str -# -# """ -# lo, hi = 0, len(self._jobs) -# timestamp = float('inf') if timestamp is None else timestamp -# while lo < hi: -# mid = (lo + hi) // 2 -# mid_job, mid_timestamp = self._jobs[mid] -# mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp -# if mid_timestamp > timestamp: -# hi = mid -# elif mid_timestamp < timestamp: -# lo = mid + 1 -# elif mid_job.id > job_id: -# hi = mid -# elif mid_job.id < job_id: -# lo = mid + 1 -# else: -# return mid -# -# return lo diff --git a/fifo/task.py b/fifo/task.py index 9ed4e12..5d16ec9 100644 --- a/fifo/task.py +++ b/fifo/task.py @@ -116,7 +116,7 @@ class Task: # ) continue - if t["type"] == "cron": # TODO: Implement this, should be easy + if t["type"] == "cron": triggers.append(t) # already a string, nothing to do continue From 3d64bcf768f8518bc34ffd048e80ea4f77a5cf5c Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 3 Sep 2020 10:17:43 -0400 Subject: [PATCH 11/12] jobs_index is unused --- fifo/fifo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fifo/fifo.py b/fifo/fifo.py index 9e0580e..38bef79 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -48,7 +48,7 @@ class FIFO(commands.Cog): self.bot = bot self.config = Config.get_conf(self, identifier=70737079, force_registration=True) - default_global = {"jobs_index": {}, "jobs": []} + default_global = {"jobs": []} default_guild = {"tasks": {}} self.config.register_global(**default_global) From 12d0b2944ed511d6119c16d848f5e21ad1cd1bc4 Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 3 Sep 2020 10:26:10 -0400 Subject: [PATCH 12/12] Reformatting --- fifo/datetime_cron_converters.py | 5 +++-- fifo/fifo.py | 2 +- fifo/redconfigjobstore.py | 1 - fifo/task.py | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/fifo/datetime_cron_converters.py b/fifo/datetime_cron_converters.py index 5382b07..d59ef37 100644 --- a/fifo/datetime_cron_converters.py +++ b/fifo/datetime_cron_converters.py @@ -2,8 +2,8 @@ from datetime import datetime from typing import TYPE_CHECKING from apscheduler.triggers.cron import CronTrigger -from discord.ext.commands import BadArgument, Converter from dateutil import parser +from discord.ext.commands import BadArgument, Converter from fifo.timezones import assemble_timezones @@ -11,6 +11,7 @@ if TYPE_CHECKING: DatetimeConverter = datetime CronConverter = str else: + class DatetimeConverter(Converter): async def convert(self, ctx, argument) -> datetime: dt = parser.parse(argument, fuzzy=True, tzinfos=assemble_timezones()) @@ -25,4 +26,4 @@ else: except ValueError: raise BadArgument() - return argument \ No newline at end of file + return argument diff --git a/fifo/fifo.py b/fifo/fifo.py index 38bef79..e84e342 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -9,7 +9,7 @@ from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING from redbot.core import Config, checks, commands from redbot.core.bot import Red -from redbot.core.commands import DictConverter, TimedeltaConverter +from redbot.core.commands import TimedeltaConverter from .datetime_cron_converters import CronConverter, DatetimeConverter from .task import Task diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index aa6d967..7e68697 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -3,7 +3,6 @@ import base64 import logging import pickle from datetime import datetime -from time import sleep from typing import Tuple, Union from apscheduler.job import Job diff --git a/fifo/task.py b/fifo/task.py index 5d16ec9..83158d8 100644 --- a/fifo/task.py +++ b/fifo/task.py @@ -5,9 +5,9 @@ from typing import Dict, List, Union import discord from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import OrTrigger +from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from apscheduler.triggers.cron import CronTrigger from discord.utils import time_snowflake from redbot.core import Config, commands from redbot.core.bot import Red