|
|
@ -28,7 +28,7 @@ class RedConfigJobStore(MemoryJobStore):
|
|
|
|
self.config = config
|
|
|
|
self.config = config
|
|
|
|
self.bot = bot
|
|
|
|
self.bot = bot
|
|
|
|
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
|
|
|
|
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
|
|
|
|
self._eventloop = self.bot.loop
|
|
|
|
self._eventloop = self.bot.loop # Used for @run_in_event_loop
|
|
|
|
# TODO: self.config.jobs_index is never used,
|
|
|
|
# TODO: self.config.jobs_index is never used,
|
|
|
|
# fine but maybe a sign of inefficient use of config
|
|
|
|
# fine but maybe a sign of inefficient use of config
|
|
|
|
|
|
|
|
|
|
|
@ -40,32 +40,50 @@ class RedConfigJobStore(MemoryJobStore):
|
|
|
|
@run_in_event_loop
|
|
|
|
@run_in_event_loop
|
|
|
|
def start(self, scheduler, alias):
|
|
|
|
def start(self, scheduler, alias):
|
|
|
|
super().start(scheduler, alias)
|
|
|
|
super().start(scheduler, alias)
|
|
|
|
|
|
|
|
for job, timestamp in self._jobs:
|
|
|
|
|
|
|
|
job._scheduler = self._scheduler
|
|
|
|
|
|
|
|
job._jobstore_alias = self._alias
|
|
|
|
|
|
|
|
|
|
|
|
async def load_from_config(self, scheduler, alias):
|
|
|
|
async def load_from_config(self):
|
|
|
|
super().start(scheduler, alias)
|
|
|
|
|
|
|
|
_jobs = await self.config.jobs()
|
|
|
|
_jobs = await self.config.jobs()
|
|
|
|
self._jobs = [
|
|
|
|
# self._jobs = [
|
|
|
|
(await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
|
|
|
|
# (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
|
|
|
|
]
|
|
|
|
# ]
|
|
|
|
|
|
|
|
async for job, timestamp in AsyncIter(_jobs):
|
|
|
|
|
|
|
|
job = await self._decode_job(job)
|
|
|
|
|
|
|
|
index = self._get_job_index(timestamp, job.id)
|
|
|
|
|
|
|
|
self._jobs.insert(index, (job, timestamp))
|
|
|
|
|
|
|
|
self._jobs_index[job.id] = (job, timestamp)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def save_to_config(self):
|
|
|
|
|
|
|
|
"""Yea that's basically it"""
|
|
|
|
|
|
|
|
await self.config.jobs.set(
|
|
|
|
|
|
|
|
[(self._encode_job(job), timestamp) for job, timestamp in self._jobs]
|
|
|
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
|
|
|
|
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
|
|
|
|
self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
|
|
|
|
# self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
|
|
|
|
|
|
|
|
|
|
|
|
def _encode_job(self, job: Job):
|
|
|
|
def _encode_job(self, job: Job):
|
|
|
|
job_state = job.__getstate__()
|
|
|
|
job_state = job.__getstate__()
|
|
|
|
new_args = list(job_state["args"])
|
|
|
|
job_state["kwargs"]["config"] = None
|
|
|
|
new_args[0]["config"] = None
|
|
|
|
job_state["kwargs"]["bot"] = None
|
|
|
|
new_args[0]["bot"] = None
|
|
|
|
# new_kwargs = job_state["kwargs"]
|
|
|
|
job_state["args"] = tuple(new_args)
|
|
|
|
# new_kwargs["config"] = None
|
|
|
|
|
|
|
|
# new_kwargs["bot"] = None
|
|
|
|
|
|
|
|
# job_state["kwargs"] = new_kwargs
|
|
|
|
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
|
|
|
|
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
|
|
|
|
out = {
|
|
|
|
out = {
|
|
|
|
"_id": job.id,
|
|
|
|
"_id": job.id,
|
|
|
|
"next_run_time": datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
"next_run_time": datetime_to_utc_timestamp(job.next_run_time),
|
|
|
|
"job_state": encoded.decode("ascii"),
|
|
|
|
"job_state": encoded.decode("ascii"),
|
|
|
|
}
|
|
|
|
}
|
|
|
|
new_args = list(job_state["args"])
|
|
|
|
job_state["kwargs"]["config"] = self.config
|
|
|
|
new_args[0]["config"] = self.config
|
|
|
|
job_state["kwargs"]["bot"] = self.bot
|
|
|
|
new_args[0]["bot"] = self.bot
|
|
|
|
# new_kwargs = job_state["kwargs"]
|
|
|
|
job_state["args"] = tuple(new_args)
|
|
|
|
# new_kwargs["config"] = self.config
|
|
|
|
|
|
|
|
# new_kwargs["bot"] = self.bot
|
|
|
|
|
|
|
|
# job_state["kwargs"] = new_kwargs
|
|
|
|
# log.debug(f"Encoding job id: {job.id}\n"
|
|
|
|
# log.debug(f"Encoding job id: {job.id}\n"
|
|
|
|
# f"Encoded as: {out}")
|
|
|
|
# f"Encoded as: {out}")
|
|
|
|
|
|
|
|
|
|
|
@ -76,10 +94,15 @@ class RedConfigJobStore(MemoryJobStore):
|
|
|
|
return None
|
|
|
|
return None
|
|
|
|
job_state = in_job["job_state"]
|
|
|
|
job_state = in_job["job_state"]
|
|
|
|
job_state = pickle.loads(base64.b64decode(job_state))
|
|
|
|
job_state = pickle.loads(base64.b64decode(job_state))
|
|
|
|
new_args = list(job_state["args"])
|
|
|
|
if job_state["args"]: # Backwards compatibility on args to kwargs
|
|
|
|
new_args[0]["config"] = self.config
|
|
|
|
job_state["kwargs"] = {**job_state["args"][0]}
|
|
|
|
new_args[0]["bot"] = self.bot
|
|
|
|
job_state["args"] = []
|
|
|
|
job_state["args"] = tuple(new_args)
|
|
|
|
job_state["kwargs"]["config"] = self.config
|
|
|
|
|
|
|
|
job_state["kwargs"]["bot"] = self.bot
|
|
|
|
|
|
|
|
# new_kwargs = job_state["kwargs"]
|
|
|
|
|
|
|
|
# new_kwargs["config"] = self.config
|
|
|
|
|
|
|
|
# new_kwargs["bot"] = self.bot
|
|
|
|
|
|
|
|
# job_state["kwargs"] = new_kwargs
|
|
|
|
job = Job.__new__(Job)
|
|
|
|
job = Job.__new__(Job)
|
|
|
|
job.__setstate__(job_state)
|
|
|
|
job.__setstate__(job_state)
|
|
|
|
job._scheduler = self._scheduler
|
|
|
|
job._scheduler = self._scheduler
|
|
|
@ -96,78 +119,82 @@ class RedConfigJobStore(MemoryJobStore):
|
|
|
|
|
|
|
|
|
|
|
|
return job
|
|
|
|
return job
|
|
|
|
|
|
|
|
|
|
|
|
@run_in_event_loop
|
|
|
|
# @run_in_event_loop
|
|
|
|
def add_job(self, job: Job):
|
|
|
|
# def add_job(self, job: Job):
|
|
|
|
if job.id in self._jobs_index:
|
|
|
|
# if job.id in self._jobs_index:
|
|
|
|
raise ConflictingIdError(job.id)
|
|
|
|
# raise ConflictingIdError(job.id)
|
|
|
|
# log.debug(f"Check job args: {job.args=}")
|
|
|
|
# # log.debug(f"Check job args: {job.args=}")
|
|
|
|
timestamp = datetime_to_utc_timestamp(job.next_run_time)
|
|
|
|
# timestamp = datetime_to_utc_timestamp(job.next_run_time)
|
|
|
|
index = self._get_job_index(timestamp, job.id) # This is fine
|
|
|
|
# index = self._get_job_index(timestamp, job.id) # This is fine
|
|
|
|
self._jobs.insert(index, (job, timestamp))
|
|
|
|
# self._jobs.insert(index, (job, timestamp))
|
|
|
|
self._jobs_index[job.id] = (job, timestamp)
|
|
|
|
# self._jobs_index[job.id] = (job, timestamp)
|
|
|
|
asyncio.create_task(self._async_add_job(job, index, timestamp))
|
|
|
|
# task = asyncio.create_task(self._async_add_job(job, index, timestamp))
|
|
|
|
# log.debug(f"Added job: {self._jobs[index][0].args}")
|
|
|
|
# self._eventloop.run_until_complete(task)
|
|
|
|
|
|
|
|
# # log.debug(f"Added job: {self._jobs[index][0].args}")
|
|
|
|
async def _async_add_job(self, job, index, timestamp):
|
|
|
|
#
|
|
|
|
encoded_job = self._encode_job(job)
|
|
|
|
# async def _async_add_job(self, job, index, timestamp):
|
|
|
|
job_tuple = tuple([encoded_job, timestamp])
|
|
|
|
# encoded_job = self._encode_job(job)
|
|
|
|
async with self.config.jobs() as jobs:
|
|
|
|
# job_tuple = tuple([encoded_job, timestamp])
|
|
|
|
jobs.insert(index, job_tuple)
|
|
|
|
# async with self.config.jobs() as jobs:
|
|
|
|
# await self.config.jobs_index.set_raw(job.id, value=job_tuple)
|
|
|
|
# jobs.insert(index, job_tuple)
|
|
|
|
return True
|
|
|
|
# # await self.config.jobs_index.set_raw(job.id, value=job_tuple)
|
|
|
|
|
|
|
|
# return True
|
|
|
|
@run_in_event_loop
|
|
|
|
|
|
|
|
def update_job(self, job):
|
|
|
|
# @run_in_event_loop
|
|
|
|
old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
|
|
|
|
# def update_job(self, job):
|
|
|
|
job.id, (None, None)
|
|
|
|
# old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
|
|
|
|
)
|
|
|
|
# job.id, (None, None)
|
|
|
|
old_job = old_tuple[0]
|
|
|
|
# )
|
|
|
|
old_timestamp = old_tuple[1]
|
|
|
|
# old_job = old_tuple[0]
|
|
|
|
if old_job is None:
|
|
|
|
# old_timestamp = old_tuple[1]
|
|
|
|
raise JobLookupError(job.id)
|
|
|
|
# if old_job is None:
|
|
|
|
|
|
|
|
# raise JobLookupError(job.id)
|
|
|
|
# If the next run time has not changed, simply replace the job in its present index.
|
|
|
|
#
|
|
|
|
# Otherwise, reinsert the job to the list to preserve the ordering.
|
|
|
|
# # If the next run time has not changed, simply replace the job in its present index.
|
|
|
|
old_index = self._get_job_index(old_timestamp, old_job.id)
|
|
|
|
# # Otherwise, reinsert the job to the list to preserve the ordering.
|
|
|
|
new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
|
|
|
|
# old_index = self._get_job_index(old_timestamp, old_job.id)
|
|
|
|
asyncio.create_task(
|
|
|
|
# new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
|
|
|
|
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
|
|
|
|
# task = asyncio.create_task(
|
|
|
|
)
|
|
|
|
# self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
|
|
|
|
|
|
|
|
# )
|
|
|
|
async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
|
|
|
|
# self._eventloop.run_until_complete(task)
|
|
|
|
encoded_job = self._encode_job(job)
|
|
|
|
#
|
|
|
|
if old_timestamp == new_timestamp:
|
|
|
|
# async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
|
|
|
|
self._jobs[old_index] = (job, new_timestamp)
|
|
|
|
# encoded_job = self._encode_job(job)
|
|
|
|
async with self.config.jobs() as jobs:
|
|
|
|
# if old_timestamp == new_timestamp:
|
|
|
|
jobs[old_index] = (encoded_job, new_timestamp)
|
|
|
|
# self._jobs[old_index] = (job, new_timestamp)
|
|
|
|
else:
|
|
|
|
# async with self.config.jobs() as jobs:
|
|
|
|
del self._jobs[old_index]
|
|
|
|
# jobs[old_index] = (encoded_job, new_timestamp)
|
|
|
|
new_index = self._get_job_index(new_timestamp, job.id) # This is fine
|
|
|
|
# else:
|
|
|
|
self._jobs.insert(new_index, (job, new_timestamp))
|
|
|
|
# del self._jobs[old_index]
|
|
|
|
async with self.config.jobs() as jobs:
|
|
|
|
# new_index = self._get_job_index(new_timestamp, job.id) # This is fine
|
|
|
|
del jobs[old_index]
|
|
|
|
# self._jobs.insert(new_index, (job, new_timestamp))
|
|
|
|
jobs.insert(new_index, (encoded_job, new_timestamp))
|
|
|
|
# async with self.config.jobs() as jobs:
|
|
|
|
self._jobs_index[old_job.id] = (job, new_timestamp)
|
|
|
|
# del jobs[old_index]
|
|
|
|
# await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
|
|
|
|
# jobs.insert(new_index, (encoded_job, new_timestamp))
|
|
|
|
|
|
|
|
# self._jobs_index[old_job.id] = (job, new_timestamp)
|
|
|
|
log.debug(f"Async Updated {job.id=}")
|
|
|
|
# # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
|
|
|
|
log.debug(f"Check job args: {job.args=}")
|
|
|
|
#
|
|
|
|
|
|
|
|
# log.debug(f"Async Updated {job.id=}")
|
|
|
|
@run_in_event_loop
|
|
|
|
# # log.debug(f"Check job args: {job.kwargs=}")
|
|
|
|
def remove_job(self, job_id):
|
|
|
|
|
|
|
|
job, timestamp = self._jobs_index.get(job_id, (None, None))
|
|
|
|
# @run_in_event_loop
|
|
|
|
if job is None:
|
|
|
|
# def remove_job(self, job_id):
|
|
|
|
raise JobLookupError(job_id)
|
|
|
|
# """Copied instead of super for the asyncio args"""
|
|
|
|
|
|
|
|
# job, timestamp = self._jobs_index.get(job_id, (None, None))
|
|
|
|
index = self._get_job_index(timestamp, job_id)
|
|
|
|
# if job is None:
|
|
|
|
del self._jobs[index]
|
|
|
|
# raise JobLookupError(job_id)
|
|
|
|
del self._jobs_index[job.id]
|
|
|
|
#
|
|
|
|
asyncio.create_task(self._async_remove_job(index, job))
|
|
|
|
# index = self._get_job_index(timestamp, job_id)
|
|
|
|
|
|
|
|
# del self._jobs[index]
|
|
|
|
async def _async_remove_job(self, index, job):
|
|
|
|
# del self._jobs_index[job.id]
|
|
|
|
async with self.config.jobs() as jobs:
|
|
|
|
# task = asyncio.create_task(self._async_remove_job(index, job))
|
|
|
|
del jobs[index]
|
|
|
|
# self._eventloop.run_until_complete(task)
|
|
|
|
# await self.config.jobs_index.clear_raw(job.id)
|
|
|
|
#
|
|
|
|
|
|
|
|
# async def _async_remove_job(self, index, job):
|
|
|
|
|
|
|
|
# async with self.config.jobs() as jobs:
|
|
|
|
|
|
|
|
# del jobs[index]
|
|
|
|
|
|
|
|
# # await self.config.jobs_index.clear_raw(job.id)
|
|
|
|
|
|
|
|
|
|
|
|
@run_in_event_loop
|
|
|
|
@run_in_event_loop
|
|
|
|
def remove_all_jobs(self):
|
|
|
|
def remove_all_jobs(self):
|
|
|
@ -180,4 +207,8 @@ class RedConfigJobStore(MemoryJobStore):
|
|
|
|
|
|
|
|
|
|
|
|
def shutdown(self):
|
|
|
|
def shutdown(self):
|
|
|
|
"""Removes all jobs without clearing config"""
|
|
|
|
"""Removes all jobs without clearing config"""
|
|
|
|
|
|
|
|
asyncio.create_task(self.async_shutdown())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
async def async_shutdown(self):
|
|
|
|
|
|
|
|
await self.save_to_config()
|
|
|
|
super().remove_all_jobs()
|
|
|
|
super().remove_all_jobs()
|
|
|
|