Clear old code, shutdown manually, AsyncIter steps

pull/167/head
bobloy 4 years ago
parent 0ff56d933b
commit 9411fff5e8

@ -39,7 +39,7 @@ class RedConfigJobStore(MemoryJobStore):
# self._jobs = [
# (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
# ]
async for job, timestamp in AsyncIter(_jobs):
async for job, timestamp in AsyncIter(_jobs, steps=5):
job = await self._decode_job(job)
index = self._get_job_index(timestamp, job.id)
self._jobs.insert(index, (job, timestamp))
@ -109,83 +109,6 @@ class RedConfigJobStore(MemoryJobStore):
return job
# @run_in_event_loop
# def add_job(self, job: Job):
# if job.id in self._jobs_index:
# raise ConflictingIdError(job.id)
# # log.debug(f"Check job args: {job.args=}")
# timestamp = datetime_to_utc_timestamp(job.next_run_time)
# index = self._get_job_index(timestamp, job.id) # This is fine
# self._jobs.insert(index, (job, timestamp))
# self._jobs_index[job.id] = (job, timestamp)
# task = asyncio.create_task(self._async_add_job(job, index, timestamp))
# self._eventloop.run_until_complete(task)
# # log.debug(f"Added job: {self._jobs[index][0].args}")
#
# async def _async_add_job(self, job, index, timestamp):
# encoded_job = self._encode_job(job)
# job_tuple = tuple([encoded_job, timestamp])
# async with self.config.jobs() as jobs:
# jobs.insert(index, job_tuple)
# # await self.config.jobs_index.set_raw(job.id, value=job_tuple)
# return True
# @run_in_event_loop
# def update_job(self, job):
# old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
# job.id, (None, None)
# )
# old_job = old_tuple[0]
# old_timestamp = old_tuple[1]
# if old_job is None:
# raise JobLookupError(job.id)
#
# # If the next run time has not changed, simply replace the job in its present index.
# # Otherwise, reinsert the job to the list to preserve the ordering.
# old_index = self._get_job_index(old_timestamp, old_job.id)
# new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
# task = asyncio.create_task(
# self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
# )
# self._eventloop.run_until_complete(task)
#
# async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
# encoded_job = self._encode_job(job)
# if old_timestamp == new_timestamp:
# self._jobs[old_index] = (job, new_timestamp)
# async with self.config.jobs() as jobs:
# jobs[old_index] = (encoded_job, new_timestamp)
# else:
# del self._jobs[old_index]
# new_index = self._get_job_index(new_timestamp, job.id) # This is fine
# self._jobs.insert(new_index, (job, new_timestamp))
# async with self.config.jobs() as jobs:
# del jobs[old_index]
# jobs.insert(new_index, (encoded_job, new_timestamp))
# self._jobs_index[old_job.id] = (job, new_timestamp)
# # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
#
# log.debug(f"Async Updated {job.id=}")
# # log.debug(f"Check job args: {job.kwargs=}")
# @run_in_event_loop
# def remove_job(self, job_id):
# """Copied instead of super for the asyncio args"""
# job, timestamp = self._jobs_index.get(job_id, (None, None))
# if job is None:
# raise JobLookupError(job_id)
#
# index = self._get_job_index(timestamp, job_id)
# del self._jobs[index]
# del self._jobs_index[job.id]
# task = asyncio.create_task(self._async_remove_job(index, job))
# self._eventloop.run_until_complete(task)
#
# async def _async_remove_job(self, index, job):
# async with self.config.jobs() as jobs:
# del jobs[index]
# # await self.config.jobs_index.clear_raw(job.id)
@run_in_event_loop
def remove_all_jobs(self):
super().remove_all_jobs()
@ -201,4 +124,5 @@ class RedConfigJobStore(MemoryJobStore):
async def async_shutdown(self):
await self.save_to_config()
super().remove_all_jobs()
self._jobs = []
self._jobs_index = {}

Loading…
Cancel
Save