From e602b5c868e546bcdd3d736b2fb28b8f5fc3b713 Mon Sep 17 00:00:00 2001 From: bobloy Date: Thu, 27 Aug 2020 16:22:36 -0400 Subject: [PATCH] Almost working. Date time is only date, figure out what's going on with time. --- fifo/fifo.py | 214 ++++++++++++++++++++++++++++++-------- fifo/info.json | 3 + fifo/redconfigjobstore.py | 83 +++++++++++++-- fifo/redjob.py | 44 ++++++++ 4 files changed, 287 insertions(+), 57 deletions(-) create mode 100644 fifo/redjob.py diff --git a/fifo/fifo.py b/fifo/fifo.py index 1ffb612..c5ea125 100644 --- a/fifo/fifo.py +++ b/fifo/fifo.py @@ -1,19 +1,32 @@ +import logging from datetime import datetime, timedelta from typing import Dict, Union import discord +from apscheduler.job import Job from apscheduler.schedulers.asyncio import AsyncIOScheduler from apscheduler.triggers.base import BaseTrigger from apscheduler.triggers.combining import OrTrigger from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger -from dateutil import parser from redbot.core import Config, checks, commands from redbot.core.bot import Red -from redbot.core.commands import DictConverter, TimedeltaConverter, parse_timedelta +from redbot.core.commands import DictConverter, TimedeltaConverter from .datetimeconverter import DatetimeConverter -from .redconfigjobstore import RedConfigJobStore + +log = logging.getLogger("red.fox_v3.fifo") +schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler") +schedule_log.setLevel(logging.DEBUG) +log.setLevel(logging.DEBUG) + + +async def _execute_task(task_state): + log.info(f"Executing {task_state=}") + task = Task(**task_state) + if await task.load_from_config(): + return await task.execute() + return False def get_trigger(data): @@ -37,11 +50,14 @@ def parse_triggers(data: Union[Dict, None]): if len(data["triggers"]) > 1: # Multiple triggers return OrTrigger(get_trigger(t_data) for t_data in data["triggers"]) - return get_trigger(data[0]) + return get_trigger(data["triggers"][0]) -class FakeMessage: - _state = None +# class FakeMessage(discord.Message): +# def __init__(self, *, state, channel, data): +# super().__init__(state=state, channel=channel, data=data) +# +# _state = None # class FakeMessage(discord.Message): @@ -57,35 +73,47 @@ class Task: "time_data": None, # Used for Interval and Date Triggers } - def __init__(self, name: str, guild_id, config: Config, author_id=None, bot: Red = None): + def __init__( + self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None + ): self.name = name self.guild_id = guild_id self.config = config self.bot = bot self.author_id = author_id + self.channel_id = channel_id self.data = None - async def _encode_time_data(self): + async def _encode_time_triggers(self): if not self.data or not self.data.get("triggers", None): - return None + return [] triggers = [] for t in self.data["triggers"]: if t["type"] == "interval": # Convert into timedelta td: timedelta = t["time_data"] - triggers.append({"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds} }) + triggers.append( + {"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}} + ) + continue if t["type"] == "date": # Convert into datetime dt: datetime = t["time_data"] - triggers.append({"type": t["type"], "time_data": { - "year": dt.year, - "month": dt.month, - "day": dt.day, - "hour": dt.hour, - "minute": dt.minute, - "second": dt.second, - }}) + triggers.append( + { + "type": t["type"], + "time_data": { + "year": dt.year, + "month": dt.month, + "day": dt.day, + "hour": dt.hour, + "minute": dt.minute, + "second": dt.second, + }, + } + ) + continue if t["type"] == "cron": raise NotImplemented @@ -93,16 +121,18 @@ class Task: return triggers - async def _decode_time_data(self): + async def _decode_time_triggers(self): if not self.data or not self.data.get("triggers", None): return - for t in self.data["triggers"]: + for n, t in enumerate(self.data["triggers"]): if t["type"] == "interval": # Convert into timedelta - t["time_data"] = timedelta(**t["time_data"]) + self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"]) + continue if t["type"] == "date": # Convert into datetime - t["time_data"] = datetime(**t["time_data"]) + self.data["triggers"][n]["time_data"] = datetime(**t["time_data"]) + continue if t["type"] == "cron": raise NotImplemented @@ -121,10 +151,11 @@ class Task: self.author_id = data["author_id"] self.guild_id = data["guild_id"] + self.channel_id = data["channel_id"] self.data = data["data"] - await self._decode_time_data() + await self._decode_time_triggers() return self.data async def get_trigger(self) -> Union[BaseTrigger, None]: @@ -145,11 +176,12 @@ class Task: data_to_save = self.default_task_data.copy() if self.data: data_to_save["command_str"] = self.data.get("command_str", "") - data_to_save["triggers"] = await self._encode_time_data() + data_to_save["triggers"] = await self._encode_time_triggers() to_save = { "guild_id": self.guild_id, "author_id": self.author_id, + "channel_id": self.channel_id, "data": data_to_save, } await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save) @@ -158,23 +190,54 @@ class Task: """To be used when updating triggers""" if not self.data: return + + data_to_save = self.data.copy() + data_to_save["triggers"] = await self._encode_time_triggers() + await self.config.guild_from_id(self.guild_id).tasks.set_raw( - self.name, "data", value=await self._encode_time_data() + self.name, "data", value=data_to_save ) async def execute(self): - if not self.data or self.data["command_str"]: + if not self.data or not self.data.get("command_str", False): + log.warning(f"Could not execute task due to data problem: {self.data=}") return False - message = FakeMessage() - message.guild = self.bot.get_guild(self.guild_id) # used for get_prefix - message.author = message.guild.get_member(self.author_id) - message.content = await self.bot.get_prefix(message) + self.data["command_str"] + + guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix + if guild is None: + log.warning(f"Could not execute task due to missing guild: {self.guild_id}") + return False + channel: discord.TextChannel = guild.get_channel(self.channel_id) + if channel is None: + log.warning(f"Could not execute task due to missing channel: {self.channel_id}") + return False + author: discord.User = guild.get_member(self.author_id) + if author is None: + log.warning(f"Could not execute task due to missing author: {self.author_id}") + return False + + message = channel.last_message + if message is None: + log.warning("No message found in channel cache yet, skipping execution") + return + message.author = author + + prefixes = await self.bot.get_prefix(message) + if isinstance(prefixes, str): + prefix = prefixes + else: + prefix = prefixes[0] + + message.content = f"{prefix}{self.data['command_str']}" if not message.guild or not message.author or not message.content: + log.warning(f"Could not execute task due to message problem: {message}") return False new_ctx: commands.Context = await self.bot.get_context(message) + new_ctx.assume_yes = True if not new_ctx.valid: + log.warning(f"Could not execute task due invalid context: {new_ctx}") return False await self.bot.invoke(new_ctx) @@ -203,6 +266,31 @@ class Task: self.data["triggers"].append(trigger_data) return True + def __setstate__(self, task_state): + self.name = task_state["name"] + self.guild_id = task_state["guild_id"] + self.config = task_state["config"] + self.bot = None + self.author_id = None + self.channel_id = None + self.data = None + + def __getstate__(self): + return { + "name": self.name, + "guild_id": self.guild_id, + "config": self.config, + "bot": self.bot, + } + + +def _assemble_job_id(task_name, guild_id): + return f"{task_name}_{guild_id}" + + +def _disassemble_job_id(job_id: str): + return job_id.split("_") + class FIFO(commands.Cog): """ @@ -222,6 +310,8 @@ class FIFO(commands.Cog): self.config.register_global(**default_global) self.config.register_guild(**default_guild) + from .redconfigjobstore import RedConfigJobStore + jobstores = {"default": RedConfigJobStore(self.config, self.bot)} job_defaults = {"coalesce": False, "max_instances": 1} @@ -229,7 +319,9 @@ class FIFO(commands.Cog): # executors = {"default": AsyncIOExecutor()} # Default executor is already AsyncIOExecutor - self.scheduler = AsyncIOScheduler(jobstores=jobstores, job_defaults=job_defaults) + self.scheduler = AsyncIOScheduler( + jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log + ) self.scheduler.start() @@ -237,34 +329,48 @@ class FIFO(commands.Cog): """Nothing to delete""" return - def _assemble_job_id(self, task_name, guild_id): - return task_name + "_" + guild_id - async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str): - message = FakeMessage() + message: discord.Message = ctx.message + message.content = ctx.prefix + command_to_parse message.author = ctx.author - message.guild = ctx.guild new_ctx: commands.Context = await self.bot.get_context(message) return new_ctx.valid - async def _get_job(self, task_name, guild_id): - return self.scheduler.get_job(self._assemble_job_id(task_name, guild_id)) + async def _process_task(self, task: Task): + job = await self._get_job(task) + if job is not None: + job.remove() - async def _add_job(self, task): + return await self._add_job(task) + + async def _get_job(self, task: Task) -> Job: + return self.scheduler.get_job(_assemble_job_id(task.name, task.guild_id)) + + async def _add_job(self, task: Task): return self.scheduler.add_job( - task.execute, - id=self._assemble_job_id(task.name, task.guild_id), + _execute_task, + args=[task.__getstate__()], + id=_assemble_job_id(task.name, task.guild_id), trigger=await task.get_trigger(), ) + async def _pause_job(self, task: Task): + return self.scheduler.pause_job(job_id=_assemble_job_id(task.name, task.guild_id)) + + async def _remove_job(self, task: Task): + return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id)) + @checks.is_owner() @commands.command() async def fifoclear(self, ctx: commands.Context): - """Debug command to clear fifo config""" + """Debug command to clear all current fifo data""" await self.config.guild(ctx.guild).tasks.clear() + await self.config.jobs.clear() + await self.config.jobs_index.clear() + self.scheduler.remove_all_jobs() await ctx.tick() @checks.is_owner() # Will be reduced when I figure out permissions later @@ -286,7 +392,15 @@ class FIFO(commands.Cog): if all_guilds: pass else: - pass # TODO: parse and display tasks + out = "" + all_tasks = await self.config.guild(ctx.guild).tasks() + for task_name, task_data in all_tasks.items(): + out += f"{task_name}: {task_data}\n" + + if out: + await ctx.maybe_send_embed(out) + else: + await ctx.maybe_send_embed("No tasks to list") @fifo.command(name="add") async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): @@ -297,11 +411,17 @@ class FIFO(commands.Cog): await ctx.maybe_send_embed(f"Task already exists with {task_name=}") return + if "_" in task_name: # See _disassemble_job_id + await ctx.maybe_send_embed("Task name cannot contain underscores") + return + if not await self._check_parsable_command(ctx, command_to_execute): - await ctx.maybe_send_embed("Failed to parse command. Make sure to include the prefix") + await ctx.maybe_send_embed( + "Failed to parse command. Make sure not to include the prefix" + ) return - task = Task(task_name, ctx.guild.id, self.config, ctx.author.id) + task = Task(task_name, ctx.guild.id, self.config, ctx.author.id, ctx.channel.id, self.bot) await task.set_commmand_str(command_to_execute) await task.save_all() await ctx.tick() @@ -329,7 +449,7 @@ class FIFO(commands.Cog): Add an interval trigger to the specified task """ - task = Task(task_name, ctx.guild.id, self.config) + task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) await task.load_from_config() if task.data is None: @@ -345,6 +465,7 @@ class FIFO(commands.Cog): ) return await task.save_data() + await self._process_task(task) await ctx.tick() @fifo_trigger.command(name="date") @@ -372,6 +493,7 @@ class FIFO(commands.Cog): return await task.save_data() + await self._process_task(task) await ctx.tick() @fifo_trigger.command(name="cron") diff --git a/fifo/info.json b/fifo/info.json index 4a9cd1c..6cc6f9d 100644 --- a/fifo/info.json +++ b/fifo/info.json @@ -8,6 +8,9 @@ "install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`", "short": "Schedule commands to be run by certain at certain times or intervals\"", "end_user_data_statement": "This cog does not store any End User Data", + "requirements": [ + "apscheduler" + ], "tags": [ "bobloy", "utilities", diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 9db7213..4921dca 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -1,42 +1,96 @@ import asyncio +import base64 +import logging +import pickle +from apscheduler.job import Job from apscheduler.jobstores.base import ConflictingIdError, JobLookupError from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.util import datetime_to_utc_timestamp from redbot.core import Config - # TODO: use get_lock on config from redbot.core.bot import Red +log = logging.getLogger("red.fox_v3.fifo.jobstore") +log.setLevel(logging.DEBUG) + +save_task_objects = [] class RedConfigJobStore(MemoryJobStore): def __init__(self, config: Config, bot: Red): super().__init__() self.config = config - # nest_asyncio.apply() self.bot = bot + self.pickle_protocol = pickle.HIGHEST_PROTOCOL asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop) async def _load_from_config(self): self._jobs = await self.config.jobs() + self._jobs = [ + (await self._decode_job(job["job_state"]), timestamp) + for (job, timestamp) in self._jobs + ] self._jobs_index = await self.config.jobs_index.all() - - def add_job(self, job): + self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} + + def _encode_job(self, job: Job): + log.info(f"Encoding job id: {job.id}") + job_state = job.__getstate__() + new_args = list(job_state["args"]) + new_args[0]["config"] = None + new_args[0]["bot"] = None + job_state["args"] = tuple(new_args) + encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol)) + out = { + "_id": job.id, + "next_run_time": datetime_to_utc_timestamp(job.next_run_time), + "job_state": encoded.decode("ascii"), + } + new_args = list(job_state["args"]) + new_args[0]["config"] = self.config + new_args[0]["bot"] = self.bot + job_state["args"] = tuple(new_args) + log.info(f"After encode: Check job args: {job.args=}") + return out + + async def _decode_job(self, job_state): + job_state = pickle.loads(base64.b64decode(job_state)) + new_args = list(job_state["args"]) + new_args[0]["config"] = self.config + new_args[0]["bot"] = self.bot + job_state["args"] = tuple(new_args) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + # task_name, guild_id = _disassemble_job_id(job.id) + # task = Task(task_name, guild_id, self.config) + # await task.load_from_config() + # save_task_objects.append(task) + # + # job.func = task.execute + + log.info(f"Decoded job id: {job.id}") + + return job + + def add_job(self, job: Job): if job.id in self._jobs_index: raise ConflictingIdError(job.id) - + log.info(f"Check job args: {job.args=}") timestamp = datetime_to_utc_timestamp(job.next_run_time) index = self._get_job_index(timestamp, job.id) # This is fine self._jobs.insert(index, (job, timestamp)) self._jobs_index[job.id] = (job, timestamp) asyncio.create_task(self._async_add_job(job, index, timestamp)) + log.info(f"Added job: {self._jobs[index][0].args}") async def _async_add_job(self, job, index, timestamp): async with self.config.jobs() as jobs: - jobs.insert(index, (job, timestamp)) - await self.config.jobs_index.set_raw(job.id, value=(job, timestamp)) + jobs.insert(index, (self._encode_job(job), timestamp)) + await self.config.jobs_index.set_raw(job.id, value=(self._encode_job(job), timestamp)) return True def update_job(self, job): @@ -48,22 +102,29 @@ class RedConfigJobStore(MemoryJobStore): # Otherwise, reinsert the job to the list to preserve the ordering. old_index = self._get_job_index(old_timestamp, old_job.id) new_timestamp = datetime_to_utc_timestamp(job.next_run_time) - asyncio.ensure_future(self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), loop=self.bot.loop) + asyncio.ensure_future( + self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), + loop=self.bot.loop, + ) async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): + encoded_job = self._encode_job(job) if old_timestamp == new_timestamp: self._jobs[old_index] = (job, new_timestamp) async with self.config.jobs() as jobs: - jobs[old_index] = (job, new_timestamp) + jobs[old_index] = (encoded_job, new_timestamp) else: del self._jobs[old_index] new_index = self._get_job_index(new_timestamp, job.id) # This is fine self._jobs.insert(new_index, (job, new_timestamp)) async with self.config.jobs() as jobs: del jobs[old_index] - jobs.insert(new_index, (job, new_timestamp)) + jobs.insert(new_index, (encoded_job, new_timestamp)) self._jobs_index[old_job.id] = (job, new_timestamp) - await self.config.jobs_index.set_raw(old_job.id, value=(job, new_timestamp)) + await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) + + log.info(f"Async Updated {job.id=}") + log.info(f"Check job args: {job.args=}") def remove_job(self, job_id): job, timestamp = self._jobs_index.get(job_id, (None, None)) diff --git a/fifo/redjob.py b/fifo/redjob.py new file mode 100644 index 0000000..c276aa4 --- /dev/null +++ b/fifo/redjob.py @@ -0,0 +1,44 @@ +import six +from apscheduler.job import Job +from apscheduler.schedulers.asyncio import AsyncIOScheduler +from apscheduler.schedulers.base import STATE_STOPPED +from apscheduler.util import undefined + + +class RedJob(Job): + pass + + +class RedAsyncIOScheduler(AsyncIOScheduler): + + def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, + misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, + next_run_time=undefined, jobstore='default', executor='default', + replace_existing=False, **trigger_args): + job_kwargs = { + 'trigger': self._create_trigger(trigger, trigger_args), + 'executor': executor, + 'func': func, + 'args': tuple(args) if args is not None else (), + 'kwargs': dict(kwargs) if kwargs is not None else {}, + 'id': id, + 'name': name, + 'misfire_grace_time': misfire_grace_time, + 'coalesce': coalesce, + 'max_instances': max_instances, + 'next_run_time': next_run_time + } + job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if + value is not undefined) + job = RedJob(self, **job_kwargs) + + # Don't really add jobs to job stores before the scheduler is up and running + with self._jobstores_lock: + if self.state == STATE_STOPPED: + self._pending_jobs.append((job, jobstore, replace_existing)) + self._logger.info('Adding job tentatively -- it will be properly scheduled when ' + 'the scheduler starts') + else: + self._real_add_job(job, jobstore, replace_existing) + + return job