Merge branch 'master' into cogguide_develop

cogguide_develop
bobloy 4 years ago
commit 59c6d303f7

@ -1,6 +1,6 @@
import itertools import itertools
import logging import logging
from datetime import datetime, timedelta, tzinfo, MAXYEAR from datetime import MAXYEAR, datetime, timedelta, tzinfo
from typing import Optional, Union from typing import Optional, Union
import discord import discord
@ -11,7 +11,7 @@ from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING
from redbot.core import Config, checks, commands from redbot.core import Config, checks, commands
from redbot.core.bot import Red from redbot.core.bot import Red
from redbot.core.commands import TimedeltaConverter from redbot.core.commands import TimedeltaConverter
from redbot.core.utils.chat_formatting import humanize_list, humanize_timedelta, pagify from redbot.core.utils.chat_formatting import humanize_timedelta, pagify
from .datetime_cron_converters import CronConverter, DatetimeConverter, TimezoneConverter from .datetime_cron_converters import CronConverter, DatetimeConverter, TimezoneConverter
from .task import Task from .task import Task
@ -22,8 +22,8 @@ schedule_log.setLevel(logging.DEBUG)
log = logging.getLogger("red.fox_v3.fifo") log = logging.getLogger("red.fox_v3.fifo")
async def _execute_task(task_state): async def _execute_task(**task_state):
log.info(f"Executing {task_state=}") log.info(f"Executing {task_state.get('name')}")
task = Task(**task_state) task = Task(**task_state)
if await task.load_from_config(): if await task.load_from_config():
return await task.execute() return await task.execute()
@ -60,6 +60,19 @@ def _get_run_times(job: Job, now: datetime = None):
next_run_time = job.trigger.get_next_fire_time(next_run_time, now) next_run_time = job.trigger.get_next_fire_time(next_run_time, now)
class CapturePrint:
"""Silly little class to get `print` output"""
def __init__(self):
self.string = None
def write(self, string):
if self.string is None:
self.string = string
else:
self.string = self.string + "\n" + string
class FIFO(commands.Cog): class FIFO(commands.Cog):
""" """
Simple Scheduling Cog Simple Scheduling Cog
@ -78,7 +91,7 @@ class FIFO(commands.Cog):
self.config.register_global(**default_global) self.config.register_global(**default_global)
self.config.register_guild(**default_guild) self.config.register_guild(**default_guild)
self.scheduler = None self.scheduler: Optional[AsyncIOScheduler] = None
self.jobstore = None self.jobstore = None
self.tz_cog = None self.tz_cog = None
@ -94,17 +107,22 @@ class FIFO(commands.Cog):
async def initialize(self): async def initialize(self):
job_defaults = {"coalesce": False, "max_instances": 1} job_defaults = {
"coalesce": True, # Multiple missed triggers within the grace time will only fire once
"max_instances": 5, # This is probably way too high, should likely only be one
"misfire_grace_time": 15, # 15 seconds ain't much, but it's honest work
"replace_existing": True, # Very important for persistent data
}
# executors = {"default": AsyncIOExecutor()} # executors = {"default": AsyncIOExecutor()}
# Default executor is already AsyncIOExecutor # Default executor is already AsyncIOExecutor
self.scheduler = AsyncIOScheduler(job_defaults=job_defaults, logger=schedule_log) self.scheduler = AsyncIOScheduler(job_defaults=job_defaults, logger=schedule_log)
from .redconfigjobstore import RedConfigJobStore from .redconfigjobstore import RedConfigJobStore # Wait to import to prevent cyclic import
self.jobstore = RedConfigJobStore(self.config, self.bot) self.jobstore = RedConfigJobStore(self.config, self.bot)
await self.jobstore.load_from_config(self.scheduler, "default") await self.jobstore.load_from_config()
self.scheduler.add_jobstore(self.jobstore, "default") self.scheduler.add_jobstore(self.jobstore, "default")
self.scheduler.start() self.scheduler.start()
@ -139,9 +157,10 @@ class FIFO(commands.Cog):
async def _add_job(self, task: Task): async def _add_job(self, task: Task):
return self.scheduler.add_job( return self.scheduler.add_job(
_execute_task, _execute_task,
args=[task.__getstate__()], kwargs=task.__getstate__(),
id=_assemble_job_id(task.name, task.guild_id), id=_assemble_job_id(task.name, task.guild_id),
trigger=await task.get_combined_trigger(), trigger=await task.get_combined_trigger(),
name=task.name,
) )
async def _resume_job(self, task: Task): async def _resume_job(self, task: Task):
@ -372,7 +391,7 @@ class FIFO(commands.Cog):
Do `[p]fifo list True` to see tasks from all guilds Do `[p]fifo list True` to see tasks from all guilds
""" """
if all_guilds: if all_guilds:
pass pass # TODO: All guilds
else: else:
out = "" out = ""
all_tasks = await self.config.guild(ctx.guild).tasks() all_tasks = await self.config.guild(ctx.guild).tasks()
@ -388,6 +407,27 @@ class FIFO(commands.Cog):
else: else:
await ctx.maybe_send_embed("No tasks to list") await ctx.maybe_send_embed("No tasks to list")
@fifo.command(name="printschedule")
async def fifo_printschedule(self, ctx: commands.Context):
"""
Print the current schedule of execution.
Useful for debugging.
"""
cp = CapturePrint()
self.scheduler.print_jobs(out=cp)
out = cp.string
if out:
if len(out) > 2000:
for page in pagify(out):
await ctx.maybe_send_embed(page)
else:
await ctx.maybe_send_embed(out)
else:
await ctx.maybe_send_embed("Failed to get schedule from scheduler")
@fifo.command(name="add") @fifo.command(name="add")
async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str): async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str):
""" """
@ -467,7 +507,7 @@ class FIFO(commands.Cog):
""" """
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot) task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config() await task.load_from_config() # Will set the channel and author
if task.data is None: if task.data is None:
await ctx.maybe_send_embed( await ctx.maybe_send_embed(
@ -497,7 +537,7 @@ class FIFO(commands.Cog):
Add a "run once" trigger at a time relative from now to the specified task Add a "run once" trigger at a time relative from now to the specified task
""" """
task = Task(task_name, ctx.guild.id, self.config) task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config() await task.load_from_config()
if task.data is None: if task.data is None:
@ -531,7 +571,7 @@ class FIFO(commands.Cog):
Add a "run once" datetime trigger to the specified task Add a "run once" datetime trigger to the specified task
""" """
task = Task(task_name, ctx.guild.id, self.config) task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config() await task.load_from_config()
if task.data is None: if task.data is None:
@ -571,7 +611,7 @@ class FIFO(commands.Cog):
See https://crontab.guru/ for help generating the cron_str See https://crontab.guru/ for help generating the cron_str
""" """
task = Task(task_name, ctx.guild.id, self.config) task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config() await task.load_from_config()
if task.data is None: if task.data is None:

@ -2,17 +2,13 @@ import asyncio
import base64 import base64
import logging import logging
import pickle import pickle
from datetime import datetime
from typing import Tuple, Union
from apscheduler.job import Job from apscheduler.job import Job
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
from apscheduler.jobstores.memory import MemoryJobStore from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.asyncio import run_in_event_loop from apscheduler.schedulers.asyncio import run_in_event_loop
from apscheduler.util import datetime_to_utc_timestamp from apscheduler.util import datetime_to_utc_timestamp
from redbot.core import Config from redbot.core import Config
# TODO: use get_lock on config maybe
# TODO: use get_lock on config
from redbot.core.bot import Red from redbot.core.bot import Red
from redbot.core.utils import AsyncIter from redbot.core.utils import AsyncIter
@ -28,44 +24,56 @@ class RedConfigJobStore(MemoryJobStore):
self.config = config self.config = config
self.bot = bot self.bot = bot
self.pickle_protocol = pickle.HIGHEST_PROTOCOL self.pickle_protocol = pickle.HIGHEST_PROTOCOL
self._eventloop = self.bot.loop self._eventloop = self.bot.loop # Used for @run_in_event_loop
# TODO: self.config.jobs_index is never used,
# fine but maybe a sign of inefficient use of config
# task = asyncio.create_task(self.load_from_config())
# while not task.done():
# sleep(0.1)
# future = asyncio.ensure_future(self.load_from_config(), loop=self.bot.loop)
@run_in_event_loop @run_in_event_loop
def start(self, scheduler, alias): def start(self, scheduler, alias):
super().start(scheduler, alias) super().start(scheduler, alias)
for job, timestamp in self._jobs:
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
async def load_from_config(self, scheduler, alias): async def load_from_config(self):
super().start(scheduler, alias)
_jobs = await self.config.jobs() _jobs = await self.config.jobs()
self._jobs = [ # self._jobs = [
(await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs) # (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
] # ]
async for job, timestamp in AsyncIter(_jobs):
job = await self._decode_job(job)
index = self._get_job_index(timestamp, job.id)
self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp)
async def save_to_config(self):
"""Yea that's basically it"""
await self.config.jobs.set(
[(self._encode_job(job), timestamp) for job, timestamp in self._jobs]
)
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next # self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs} # self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
def _encode_job(self, job: Job): def _encode_job(self, job: Job):
job_state = job.__getstate__() job_state = job.__getstate__()
new_args = list(job_state["args"]) job_state["kwargs"]["config"] = None
new_args[0]["config"] = None job_state["kwargs"]["bot"] = None
new_args[0]["bot"] = None # new_kwargs = job_state["kwargs"]
job_state["args"] = tuple(new_args) # new_kwargs["config"] = None
# new_kwargs["bot"] = None
# job_state["kwargs"] = new_kwargs
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol)) encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
out = { out = {
"_id": job.id, "_id": job.id,
"next_run_time": datetime_to_utc_timestamp(job.next_run_time), "next_run_time": datetime_to_utc_timestamp(job.next_run_time),
"job_state": encoded.decode("ascii"), "job_state": encoded.decode("ascii"),
} }
new_args = list(job_state["args"]) job_state["kwargs"]["config"] = self.config
new_args[0]["config"] = self.config job_state["kwargs"]["bot"] = self.bot
new_args[0]["bot"] = self.bot # new_kwargs = job_state["kwargs"]
job_state["args"] = tuple(new_args) # new_kwargs["config"] = self.config
# new_kwargs["bot"] = self.bot
# job_state["kwargs"] = new_kwargs
# log.debug(f"Encoding job id: {job.id}\n" # log.debug(f"Encoding job id: {job.id}\n"
# f"Encoded as: {out}") # f"Encoded as: {out}")
@ -76,10 +84,15 @@ class RedConfigJobStore(MemoryJobStore):
return None return None
job_state = in_job["job_state"] job_state = in_job["job_state"]
job_state = pickle.loads(base64.b64decode(job_state)) job_state = pickle.loads(base64.b64decode(job_state))
new_args = list(job_state["args"]) if job_state["args"]: # Backwards compatibility on args to kwargs
new_args[0]["config"] = self.config job_state["kwargs"] = {**job_state["args"][0]}
new_args[0]["bot"] = self.bot job_state["args"] = []
job_state["args"] = tuple(new_args) job_state["kwargs"]["config"] = self.config
job_state["kwargs"]["bot"] = self.bot
# new_kwargs = job_state["kwargs"]
# new_kwargs["config"] = self.config
# new_kwargs["bot"] = self.bot
# job_state["kwargs"] = new_kwargs
job = Job.__new__(Job) job = Job.__new__(Job)
job.__setstate__(job_state) job.__setstate__(job_state)
job._scheduler = self._scheduler job._scheduler = self._scheduler
@ -96,78 +109,82 @@ class RedConfigJobStore(MemoryJobStore):
return job return job
@run_in_event_loop # @run_in_event_loop
def add_job(self, job: Job): # def add_job(self, job: Job):
if job.id in self._jobs_index: # if job.id in self._jobs_index:
raise ConflictingIdError(job.id) # raise ConflictingIdError(job.id)
# log.debug(f"Check job args: {job.args=}") # # log.debug(f"Check job args: {job.args=}")
timestamp = datetime_to_utc_timestamp(job.next_run_time) # timestamp = datetime_to_utc_timestamp(job.next_run_time)
index = self._get_job_index(timestamp, job.id) # This is fine # index = self._get_job_index(timestamp, job.id) # This is fine
self._jobs.insert(index, (job, timestamp)) # self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp) # self._jobs_index[job.id] = (job, timestamp)
asyncio.create_task(self._async_add_job(job, index, timestamp)) # task = asyncio.create_task(self._async_add_job(job, index, timestamp))
# log.debug(f"Added job: {self._jobs[index][0].args}") # self._eventloop.run_until_complete(task)
# # log.debug(f"Added job: {self._jobs[index][0].args}")
async def _async_add_job(self, job, index, timestamp): #
encoded_job = self._encode_job(job) # async def _async_add_job(self, job, index, timestamp):
job_tuple = tuple([encoded_job, timestamp]) # encoded_job = self._encode_job(job)
async with self.config.jobs() as jobs: # job_tuple = tuple([encoded_job, timestamp])
jobs.insert(index, job_tuple) # async with self.config.jobs() as jobs:
# await self.config.jobs_index.set_raw(job.id, value=job_tuple) # jobs.insert(index, job_tuple)
return True # # await self.config.jobs_index.set_raw(job.id, value=job_tuple)
# return True
@run_in_event_loop
def update_job(self, job): # @run_in_event_loop
old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( # def update_job(self, job):
job.id, (None, None) # old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
) # job.id, (None, None)
old_job = old_tuple[0] # )
old_timestamp = old_tuple[1] # old_job = old_tuple[0]
if old_job is None: # old_timestamp = old_tuple[1]
raise JobLookupError(job.id) # if old_job is None:
# raise JobLookupError(job.id)
# If the next run time has not changed, simply replace the job in its present index. #
# Otherwise, reinsert the job to the list to preserve the ordering. # # If the next run time has not changed, simply replace the job in its present index.
old_index = self._get_job_index(old_timestamp, old_job.id) # # Otherwise, reinsert the job to the list to preserve the ordering.
new_timestamp = datetime_to_utc_timestamp(job.next_run_time) # old_index = self._get_job_index(old_timestamp, old_job.id)
asyncio.create_task( # new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp) # task = asyncio.create_task(
) # self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
# )
async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): # self._eventloop.run_until_complete(task)
encoded_job = self._encode_job(job) #
if old_timestamp == new_timestamp: # async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
self._jobs[old_index] = (job, new_timestamp) # encoded_job = self._encode_job(job)
async with self.config.jobs() as jobs: # if old_timestamp == new_timestamp:
jobs[old_index] = (encoded_job, new_timestamp) # self._jobs[old_index] = (job, new_timestamp)
else: # async with self.config.jobs() as jobs:
del self._jobs[old_index] # jobs[old_index] = (encoded_job, new_timestamp)
new_index = self._get_job_index(new_timestamp, job.id) # This is fine # else:
self._jobs.insert(new_index, (job, new_timestamp)) # del self._jobs[old_index]
async with self.config.jobs() as jobs: # new_index = self._get_job_index(new_timestamp, job.id) # This is fine
del jobs[old_index] # self._jobs.insert(new_index, (job, new_timestamp))
jobs.insert(new_index, (encoded_job, new_timestamp)) # async with self.config.jobs() as jobs:
self._jobs_index[old_job.id] = (job, new_timestamp) # del jobs[old_index]
# await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) # jobs.insert(new_index, (encoded_job, new_timestamp))
# self._jobs_index[old_job.id] = (job, new_timestamp)
log.debug(f"Async Updated {job.id=}") # # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
log.debug(f"Check job args: {job.args=}") #
# log.debug(f"Async Updated {job.id=}")
@run_in_event_loop # # log.debug(f"Check job args: {job.kwargs=}")
def remove_job(self, job_id):
job, timestamp = self._jobs_index.get(job_id, (None, None)) # @run_in_event_loop
if job is None: # def remove_job(self, job_id):
raise JobLookupError(job_id) # """Copied instead of super for the asyncio args"""
# job, timestamp = self._jobs_index.get(job_id, (None, None))
index = self._get_job_index(timestamp, job_id) # if job is None:
del self._jobs[index] # raise JobLookupError(job_id)
del self._jobs_index[job.id] #
asyncio.create_task(self._async_remove_job(index, job)) # index = self._get_job_index(timestamp, job_id)
# del self._jobs[index]
async def _async_remove_job(self, index, job): # del self._jobs_index[job.id]
async with self.config.jobs() as jobs: # task = asyncio.create_task(self._async_remove_job(index, job))
del jobs[index] # self._eventloop.run_until_complete(task)
# await self.config.jobs_index.clear_raw(job.id) #
# async def _async_remove_job(self, index, job):
# async with self.config.jobs() as jobs:
# del jobs[index]
# # await self.config.jobs_index.clear_raw(job.id)
@run_in_event_loop @run_in_event_loop
def remove_all_jobs(self): def remove_all_jobs(self):
@ -180,4 +197,8 @@ class RedConfigJobStore(MemoryJobStore):
def shutdown(self): def shutdown(self):
"""Removes all jobs without clearing config""" """Removes all jobs without clearing config"""
asyncio.create_task(self.async_shutdown())
async def async_shutdown(self):
await self.save_to_config()
super().remove_all_jobs() super().remove_all_jobs()

@ -40,7 +40,7 @@ def parse_triggers(data: Union[Dict, None]):
if len(data["triggers"]) > 1: # Multiple triggers if len(data["triggers"]) > 1: # Multiple triggers
return OrTrigger([get_trigger(t_data) for t_data in data["triggers"]]) return OrTrigger([get_trigger(t_data) for t_data in data["triggers"]])
else:
return get_trigger(data["triggers"][0]) return get_trigger(data["triggers"][0])
@ -166,7 +166,7 @@ class Task:
return return
self.author_id = data["author_id"] self.author_id = data["author_id"]
self.guild_id = data["guild_id"] self.guild_id = data["guild_id"] # Weird I'm doing this, since self.guild_id was just used
self.channel_id = data["channel_id"] self.channel_id = data["channel_id"]
self.data = data["data"] self.data = data["data"]

@ -5,6 +5,8 @@ All credit to https://github.com/prefrontal/dateutil-parser-timezones
""" """
# from dateutil.tz import gettz # from dateutil.tz import gettz
from datetime import datetime
from pytz import timezone from pytz import timezone
@ -227,4 +229,6 @@ def assemble_timezones():
timezones["YAKT"] = timezone("Asia/Yakutsk") # Yakutsk Time (UTC+09) timezones["YAKT"] = timezone("Asia/Yakutsk") # Yakutsk Time (UTC+09)
timezones["YEKT"] = timezone("Asia/Yekaterinburg") # Yekaterinburg Time (UTC+05) timezones["YEKT"] = timezone("Asia/Yekaterinburg") # Yekaterinburg Time (UTC+05)
dt = datetime(2020, 1, 1)
timezones.update((x, y.localize(dt).tzinfo) for x, y in timezones.items())
return timezones return timezones

Loading…
Cancel
Save