You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Fox-V3/fifo/redconfigjobstore.py

223 lines
8.7 KiB

import asyncio
import base64
import logging
import pickle
from datetime import datetime
from typing import Tuple, Union
from apscheduler.job import Job
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.schedulers.asyncio import run_in_event_loop
from apscheduler.util import datetime_to_utc_timestamp
from redbot.core import Config
# TODO: use get_lock on config
from redbot.core.bot import Red
from redbot.core.utils import AsyncIter
log = logging.getLogger("red.fox_v3.fifo.jobstore")
log.setLevel(logging.DEBUG)
save_task_objects = []
class RedConfigJobStore(MemoryJobStore):
def __init__(self, config: Config, bot: Red):
super().__init__()
self.config = config
self.bot = bot
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
4 years ago
self._eventloop = self.bot.loop # Used for @run_in_event_loop
# TODO: self.config.jobs_index is never used,
# fine but maybe a sign of inefficient use of config
# task = asyncio.create_task(self.load_from_config())
# while not task.done():
# sleep(0.1)
# future = asyncio.ensure_future(self.load_from_config(), loop=self.bot.loop)
@run_in_event_loop
def start(self, scheduler, alias):
super().start(scheduler, alias)
4 years ago
for job, timestamp in self._jobs:
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
4 years ago
async def load_from_config(self):
_jobs = await self.config.jobs()
4 years ago
# self._jobs = [
# (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
# ]
async for job, timestamp in AsyncIter(_jobs):
job = await self._decode_job(job)
index = self._get_job_index(timestamp, job.id)
self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp)
async def save_to_config(self):
"""Yea that's basically it"""
await self.config.jobs.set(
[(self._encode_job(job), timestamp) for job, timestamp in self._jobs]
)
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
4 years ago
# self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
def _encode_job(self, job: Job):
job_state = job.__getstate__()
4 years ago
job_state["kwargs"]["config"] = None
job_state["kwargs"]["bot"] = None
# new_kwargs = job_state["kwargs"]
# new_kwargs["config"] = None
# new_kwargs["bot"] = None
# job_state["kwargs"] = new_kwargs
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
out = {
"_id": job.id,
"next_run_time": datetime_to_utc_timestamp(job.next_run_time),
"job_state": encoded.decode("ascii"),
}
4 years ago
job_state["kwargs"]["config"] = self.config
job_state["kwargs"]["bot"] = self.bot
# new_kwargs = job_state["kwargs"]
# new_kwargs["config"] = self.config
# new_kwargs["bot"] = self.bot
# job_state["kwargs"] = new_kwargs
# log.debug(f"Encoding job id: {job.id}\n"
# f"Encoded as: {out}")
return out
async def _decode_job(self, in_job):
if in_job is None:
return None
job_state = in_job["job_state"]
job_state = pickle.loads(base64.b64decode(job_state))
4 years ago
if job_state["args"]: # Backwards compatibility on args to kwargs
try:
job_state["kwargs"] = {
"name": job_state["args"][0],
"guild_id": job_state["args"][1],
"author_id": job_state["args"][2],
"channel_id": job_state["args"][3],
"bot": job_state["args"][4],
}
except IndexError as e:
raise Exception(job_state["args"]) from e
4 years ago
job_state["kwargs"]["config"] = self.config
job_state["kwargs"]["bot"] = self.bot
# new_kwargs = job_state["kwargs"]
# new_kwargs["config"] = self.config
# new_kwargs["bot"] = self.bot
# job_state["kwargs"] = new_kwargs
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
# task_name, guild_id = _disassemble_job_id(job.id)
# task = Task(task_name, guild_id, self.config)
# await task.load_from_config()
# save_task_objects.append(task)
#
# job.func = task.execute
# log.debug(f"Decoded job id: {job.id}\n"
# f"Decoded as {job_state}")
return job
4 years ago
# @run_in_event_loop
# def add_job(self, job: Job):
# if job.id in self._jobs_index:
# raise ConflictingIdError(job.id)
# # log.debug(f"Check job args: {job.args=}")
# timestamp = datetime_to_utc_timestamp(job.next_run_time)
# index = self._get_job_index(timestamp, job.id) # This is fine
# self._jobs.insert(index, (job, timestamp))
# self._jobs_index[job.id] = (job, timestamp)
# task = asyncio.create_task(self._async_add_job(job, index, timestamp))
# self._eventloop.run_until_complete(task)
# # log.debug(f"Added job: {self._jobs[index][0].args}")
#
# async def _async_add_job(self, job, index, timestamp):
# encoded_job = self._encode_job(job)
# job_tuple = tuple([encoded_job, timestamp])
# async with self.config.jobs() as jobs:
# jobs.insert(index, job_tuple)
# # await self.config.jobs_index.set_raw(job.id, value=job_tuple)
# return True
# @run_in_event_loop
# def update_job(self, job):
# old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
# job.id, (None, None)
# )
# old_job = old_tuple[0]
# old_timestamp = old_tuple[1]
# if old_job is None:
# raise JobLookupError(job.id)
#
# # If the next run time has not changed, simply replace the job in its present index.
# # Otherwise, reinsert the job to the list to preserve the ordering.
# old_index = self._get_job_index(old_timestamp, old_job.id)
# new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
# task = asyncio.create_task(
# self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
# )
# self._eventloop.run_until_complete(task)
#
# async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
# encoded_job = self._encode_job(job)
# if old_timestamp == new_timestamp:
# self._jobs[old_index] = (job, new_timestamp)
# async with self.config.jobs() as jobs:
# jobs[old_index] = (encoded_job, new_timestamp)
# else:
# del self._jobs[old_index]
# new_index = self._get_job_index(new_timestamp, job.id) # This is fine
# self._jobs.insert(new_index, (job, new_timestamp))
# async with self.config.jobs() as jobs:
# del jobs[old_index]
# jobs.insert(new_index, (encoded_job, new_timestamp))
# self._jobs_index[old_job.id] = (job, new_timestamp)
# # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
#
# log.debug(f"Async Updated {job.id=}")
# # log.debug(f"Check job args: {job.kwargs=}")
# @run_in_event_loop
# def remove_job(self, job_id):
# """Copied instead of super for the asyncio args"""
# job, timestamp = self._jobs_index.get(job_id, (None, None))
# if job is None:
# raise JobLookupError(job_id)
#
# index = self._get_job_index(timestamp, job_id)
# del self._jobs[index]
# del self._jobs_index[job.id]
# task = asyncio.create_task(self._async_remove_job(index, job))
# self._eventloop.run_until_complete(task)
#
# async def _async_remove_job(self, index, job):
# async with self.config.jobs() as jobs:
# del jobs[index]
# # await self.config.jobs_index.clear_raw(job.id)
@run_in_event_loop
def remove_all_jobs(self):
super().remove_all_jobs()
asyncio.create_task(self._async_remove_all_jobs())
async def _async_remove_all_jobs(self):
await self.config.jobs.clear()
# await self.config.jobs_index.clear()
def shutdown(self):
"""Removes all jobs without clearing config"""
4 years ago
asyncio.create_task(self.async_shutdown())
async def async_shutdown(self):
await self.save_to_config()
super().remove_all_jobs()