From ca8c762e691b33fc29821fad947c2b5284484c4e Mon Sep 17 00:00:00 2001 From: bobloy Date: Tue, 17 Nov 2020 15:14:50 -0500 Subject: [PATCH] FIFO resturcture --- fifo/fifo.py | 54 ++++++++-- fifo/redconfigjobstore.py | 218 ++++++++++++++++++++++---------------- fifo/task.py | 4 +- 3 files changed, 176 insertions(+), 100 deletions(-) diff --git a/fifo/fifo.py b/fifo/fifo.py index f060211..b3272b1 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -22,8 +22,8 @@ schedule_log.setLevel(logging.DEBUG) log = logging.getLogger("red.fox_v3.fifo") -async def _execute_task(task_state): - log.info(f"Executing {task_state=}") +async def _execute_task(**task_state): + log.info(f"Executing {task_state.get('name')}") task = Task(**task_state) if await task.load_from_config(): return await task.execute() @@ -60,6 +60,19 @@ def _get_run_times(job: Job, now: datetime = None): next_run_time = job.trigger.get_next_fire_time(next_run_time, now) +class CapturePrint: + """Silly little class to get `print` output""" + + def __init__(self): + self.string = None + + def write(self, string): + if self.string is None: + self.string = string + else: + self.string = self.string + "\n" + string + + class FIFO(commands.Cog): """ Simple Scheduling Cog @@ -78,7 +91,7 @@ class FIFO(commands.Cog): self.config.register_global(**default_global) self.config.register_guild(**default_guild) - self.scheduler = None + self.scheduler: Optional[AsyncIOScheduler] = None self.jobstore = None self.tz_cog = None @@ -94,7 +107,12 @@ class FIFO(commands.Cog): async def initialize(self): - job_defaults = {"coalesce": False, "max_instances": 1} + job_defaults = { + "coalesce": True, + "max_instances": 5, + "misfire_grace_time": 15, + "replace_existing": True, + } # executors = {"default": AsyncIOExecutor()} @@ -104,7 +122,7 @@ class FIFO(commands.Cog): from .redconfigjobstore import RedConfigJobStore self.jobstore = RedConfigJobStore(self.config, self.bot) - await self.jobstore.load_from_config(self.scheduler, "default") + await self.jobstore.load_from_config() self.scheduler.add_jobstore(self.jobstore, "default") self.scheduler.start() @@ -139,9 +157,10 @@ class FIFO(commands.Cog): async def _add_job(self, task: Task): return self.scheduler.add_job( _execute_task, - args=[task.__getstate__()], + kwargs=task.__getstate__(), id=_assemble_job_id(task.name, task.guild_id), trigger=await task.get_combined_trigger(), + name=task.name, ) async def _resume_job(self, task: Task): @@ -372,7 +391,7 @@ class FIFO(commands.Cog): Do `[p]fifo list True` to see tasks from all guilds """ if all_guilds: - pass + pass # TODO: All guilds else: out = "" all_tasks = await self.config.guild(ctx.guild).tasks() @@ -388,6 +407,27 @@ class FIFO(commands.Cog): else: await ctx.maybe_send_embed("No tasks to list") + @fifo.command(name="printschedule") + async def fifo_printschedule(self, ctx: commands.Context): + """ + Print the current schedule of execution. + + Useful for debugging. + """ + cp = CapturePrint() + self.scheduler.print_jobs(out=cp) + + out = cp.string + + if out: + if len(out) > 2000: + for page in pagify(out): + await ctx.maybe_send_embed(page) + else: + await ctx.maybe_send_embed(out) + else: + await ctx.maybe_send_embed("Failed to get schedule from scheduler") + @fifo.command(name="add") async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): """ diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 7e68697..27324f6 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -28,7 +28,7 @@ class RedConfigJobStore(MemoryJobStore): self.config = config self.bot = bot self.pickle_protocol = pickle.HIGHEST_PROTOCOL - self._eventloop = self.bot.loop + self._eventloop = self.bot.loop # Used for @run_in_event_loop # TODO: self.config.jobs_index is never used, # fine but maybe a sign of inefficient use of config @@ -40,32 +40,50 @@ class RedConfigJobStore(MemoryJobStore): @run_in_event_loop def start(self, scheduler, alias): super().start(scheduler, alias) + for job, timestamp in self._jobs: + job._scheduler = self._scheduler + job._jobstore_alias = self._alias - async def load_from_config(self, scheduler, alias): - super().start(scheduler, alias) + async def load_from_config(self): _jobs = await self.config.jobs() - self._jobs = [ - (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs) - ] + # self._jobs = [ + # (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs) + # ] + async for job, timestamp in AsyncIter(_jobs): + job = await self._decode_job(job) + index = self._get_job_index(timestamp, job.id) + self._jobs.insert(index, (job, timestamp)) + self._jobs_index[job.id] = (job, timestamp) + + async def save_to_config(self): + """Yea that's basically it""" + await self.config.jobs.set( + [(self._encode_job(job), timestamp) for job, timestamp in self._jobs] + ) + # self._jobs_index = await self.config.jobs_index.all() # Overwritten by next - self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} + # self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} def _encode_job(self, job: Job): job_state = job.__getstate__() - new_args = list(job_state["args"]) - new_args[0]["config"] = None - new_args[0]["bot"] = None - job_state["args"] = tuple(new_args) + job_state["kwargs"]["config"] = None + job_state["kwargs"]["bot"] = None + # new_kwargs = job_state["kwargs"] + # new_kwargs["config"] = None + # new_kwargs["bot"] = None + # job_state["kwargs"] = new_kwargs encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol)) out = { "_id": job.id, "next_run_time": datetime_to_utc_timestamp(job.next_run_time), "job_state": encoded.decode("ascii"), } - new_args = list(job_state["args"]) - new_args[0]["config"] = self.config - new_args[0]["bot"] = self.bot - job_state["args"] = tuple(new_args) + job_state["kwargs"]["config"] = self.config + job_state["kwargs"]["bot"] = self.bot + # new_kwargs = job_state["kwargs"] + # new_kwargs["config"] = self.config + # new_kwargs["bot"] = self.bot + # job_state["kwargs"] = new_kwargs # log.debug(f"Encoding job id: {job.id}\n" # f"Encoded as: {out}") @@ -76,10 +94,20 @@ class RedConfigJobStore(MemoryJobStore): return None job_state = in_job["job_state"] job_state = pickle.loads(base64.b64decode(job_state)) - new_args = list(job_state["args"]) - new_args[0]["config"] = self.config - new_args[0]["bot"] = self.bot - job_state["args"] = tuple(new_args) + if job_state["args"]: # Backwards compatibility on args to kwargs + job_state["kwargs"] = { + "name": job_state["args"][0], + "guild_id": job_state["args"][1], + "author_id": job_state["args"][2], + "channel_id": job_state["args"][3], + "bot": job_state["args"][4], + } + job_state["kwargs"]["config"] = self.config + job_state["kwargs"]["bot"] = self.bot + # new_kwargs = job_state["kwargs"] + # new_kwargs["config"] = self.config + # new_kwargs["bot"] = self.bot + # job_state["kwargs"] = new_kwargs job = Job.__new__(Job) job.__setstate__(job_state) job._scheduler = self._scheduler @@ -96,78 +124,82 @@ class RedConfigJobStore(MemoryJobStore): return job - @run_in_event_loop - def add_job(self, job: Job): - if job.id in self._jobs_index: - raise ConflictingIdError(job.id) - # log.debug(f"Check job args: {job.args=}") - timestamp = datetime_to_utc_timestamp(job.next_run_time) - index = self._get_job_index(timestamp, job.id) # This is fine - self._jobs.insert(index, (job, timestamp)) - self._jobs_index[job.id] = (job, timestamp) - asyncio.create_task(self._async_add_job(job, index, timestamp)) - # log.debug(f"Added job: {self._jobs[index][0].args}") - - async def _async_add_job(self, job, index, timestamp): - encoded_job = self._encode_job(job) - job_tuple = tuple([encoded_job, timestamp]) - async with self.config.jobs() as jobs: - jobs.insert(index, job_tuple) - # await self.config.jobs_index.set_raw(job.id, value=job_tuple) - return True - - @run_in_event_loop - def update_job(self, job): - old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( - job.id, (None, None) - ) - old_job = old_tuple[0] - old_timestamp = old_tuple[1] - if old_job is None: - raise JobLookupError(job.id) - - # If the next run time has not changed, simply replace the job in its present index. - # Otherwise, reinsert the job to the list to preserve the ordering. - old_index = self._get_job_index(old_timestamp, old_job.id) - new_timestamp = datetime_to_utc_timestamp(job.next_run_time) - asyncio.create_task( - self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp) - ) - - async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): - encoded_job = self._encode_job(job) - if old_timestamp == new_timestamp: - self._jobs[old_index] = (job, new_timestamp) - async with self.config.jobs() as jobs: - jobs[old_index] = (encoded_job, new_timestamp) - else: - del self._jobs[old_index] - new_index = self._get_job_index(new_timestamp, job.id) # This is fine - self._jobs.insert(new_index, (job, new_timestamp)) - async with self.config.jobs() as jobs: - del jobs[old_index] - jobs.insert(new_index, (encoded_job, new_timestamp)) - self._jobs_index[old_job.id] = (job, new_timestamp) - # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) - - log.debug(f"Async Updated {job.id=}") - log.debug(f"Check job args: {job.args=}") - - @run_in_event_loop - def remove_job(self, job_id): - job, timestamp = self._jobs_index.get(job_id, (None, None)) - if job is None: - raise JobLookupError(job_id) - - index = self._get_job_index(timestamp, job_id) - del self._jobs[index] - del self._jobs_index[job.id] - asyncio.create_task(self._async_remove_job(index, job)) - - async def _async_remove_job(self, index, job): - async with self.config.jobs() as jobs: - del jobs[index] - # await self.config.jobs_index.clear_raw(job.id) + # @run_in_event_loop + # def add_job(self, job: Job): + # if job.id in self._jobs_index: + # raise ConflictingIdError(job.id) + # # log.debug(f"Check job args: {job.args=}") + # timestamp = datetime_to_utc_timestamp(job.next_run_time) + # index = self._get_job_index(timestamp, job.id) # This is fine + # self._jobs.insert(index, (job, timestamp)) + # self._jobs_index[job.id] = (job, timestamp) + # task = asyncio.create_task(self._async_add_job(job, index, timestamp)) + # self._eventloop.run_until_complete(task) + # # log.debug(f"Added job: {self._jobs[index][0].args}") + # + # async def _async_add_job(self, job, index, timestamp): + # encoded_job = self._encode_job(job) + # job_tuple = tuple([encoded_job, timestamp]) + # async with self.config.jobs() as jobs: + # jobs.insert(index, job_tuple) + # # await self.config.jobs_index.set_raw(job.id, value=job_tuple) + # return True + + # @run_in_event_loop + # def update_job(self, job): + # old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( + # job.id, (None, None) + # ) + # old_job = old_tuple[0] + # old_timestamp = old_tuple[1] + # if old_job is None: + # raise JobLookupError(job.id) + # + # # If the next run time has not changed, simply replace the job in its present index. + # # Otherwise, reinsert the job to the list to preserve the ordering. + # old_index = self._get_job_index(old_timestamp, old_job.id) + # new_timestamp = datetime_to_utc_timestamp(job.next_run_time) + # task = asyncio.create_task( + # self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp) + # ) + # self._eventloop.run_until_complete(task) + # + # async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): + # encoded_job = self._encode_job(job) + # if old_timestamp == new_timestamp: + # self._jobs[old_index] = (job, new_timestamp) + # async with self.config.jobs() as jobs: + # jobs[old_index] = (encoded_job, new_timestamp) + # else: + # del self._jobs[old_index] + # new_index = self._get_job_index(new_timestamp, job.id) # This is fine + # self._jobs.insert(new_index, (job, new_timestamp)) + # async with self.config.jobs() as jobs: + # del jobs[old_index] + # jobs.insert(new_index, (encoded_job, new_timestamp)) + # self._jobs_index[old_job.id] = (job, new_timestamp) + # # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) + # + # log.debug(f"Async Updated {job.id=}") + # # log.debug(f"Check job args: {job.kwargs=}") + + # @run_in_event_loop + # def remove_job(self, job_id): + # """Copied instead of super for the asyncio args""" + # job, timestamp = self._jobs_index.get(job_id, (None, None)) + # if job is None: + # raise JobLookupError(job_id) + # + # index = self._get_job_index(timestamp, job_id) + # del self._jobs[index] + # del self._jobs_index[job.id] + # task = asyncio.create_task(self._async_remove_job(index, job)) + # self._eventloop.run_until_complete(task) + # + # async def _async_remove_job(self, index, job): + # async with self.config.jobs() as jobs: + # del jobs[index] + # # await self.config.jobs_index.clear_raw(job.id) @run_in_event_loop def remove_all_jobs(self): @@ -180,4 +212,8 @@ class RedConfigJobStore(MemoryJobStore): def shutdown(self): """Removes all jobs without clearing config""" + asyncio.create_task(self.async_shutdown()) + + async def async_shutdown(self): + await self.save_to_config() super().remove_all_jobs() diff --git a/fifo/task.py b/fifo/task.py index 7c51ee4..005230b 100644 --- a/fifo/task.py +++ b/fifo/task.py @@ -40,8 +40,8 @@ def parse_triggers(data: Union[Dict, None]): if len(data["triggers"]) > 1: # Multiple triggers return OrTrigger([get_trigger(t_data) for t_data in data["triggers"]]) - - return get_trigger(data["triggers"][0]) + else: + return get_trigger(data["triggers"][0]) class FakeMessage: