diff --git a/fifo/redconfigjobstore.py b/fifo/redconfigjobstore.py index 126cfc0..46528bf 100644 --- a/fifo/redconfigjobstore.py +++ b/fifo/redconfigjobstore.py @@ -39,7 +39,7 @@ class RedConfigJobStore(MemoryJobStore): # self._jobs = [ # (await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs) # ] - async for job, timestamp in AsyncIter(_jobs): + async for job, timestamp in AsyncIter(_jobs, steps=5): job = await self._decode_job(job) index = self._get_job_index(timestamp, job.id) self._jobs.insert(index, (job, timestamp)) @@ -109,83 +109,6 @@ class RedConfigJobStore(MemoryJobStore): return job - # @run_in_event_loop - # def add_job(self, job: Job): - # if job.id in self._jobs_index: - # raise ConflictingIdError(job.id) - # # log.debug(f"Check job args: {job.args=}") - # timestamp = datetime_to_utc_timestamp(job.next_run_time) - # index = self._get_job_index(timestamp, job.id) # This is fine - # self._jobs.insert(index, (job, timestamp)) - # self._jobs_index[job.id] = (job, timestamp) - # task = asyncio.create_task(self._async_add_job(job, index, timestamp)) - # self._eventloop.run_until_complete(task) - # # log.debug(f"Added job: {self._jobs[index][0].args}") - # - # async def _async_add_job(self, job, index, timestamp): - # encoded_job = self._encode_job(job) - # job_tuple = tuple([encoded_job, timestamp]) - # async with self.config.jobs() as jobs: - # jobs.insert(index, job_tuple) - # # await self.config.jobs_index.set_raw(job.id, value=job_tuple) - # return True - - # @run_in_event_loop - # def update_job(self, job): - # old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get( - # job.id, (None, None) - # ) - # old_job = old_tuple[0] - # old_timestamp = old_tuple[1] - # if old_job is None: - # raise JobLookupError(job.id) - # - # # If the next run time has not changed, simply replace the job in its present index. - # # Otherwise, reinsert the job to the list to preserve the ordering. - # old_index = self._get_job_index(old_timestamp, old_job.id) - # new_timestamp = datetime_to_utc_timestamp(job.next_run_time) - # task = asyncio.create_task( - # self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp) - # ) - # self._eventloop.run_until_complete(task) - # - # async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp): - # encoded_job = self._encode_job(job) - # if old_timestamp == new_timestamp: - # self._jobs[old_index] = (job, new_timestamp) - # async with self.config.jobs() as jobs: - # jobs[old_index] = (encoded_job, new_timestamp) - # else: - # del self._jobs[old_index] - # new_index = self._get_job_index(new_timestamp, job.id) # This is fine - # self._jobs.insert(new_index, (job, new_timestamp)) - # async with self.config.jobs() as jobs: - # del jobs[old_index] - # jobs.insert(new_index, (encoded_job, new_timestamp)) - # self._jobs_index[old_job.id] = (job, new_timestamp) - # # await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp)) - # - # log.debug(f"Async Updated {job.id=}") - # # log.debug(f"Check job args: {job.kwargs=}") - - # @run_in_event_loop - # def remove_job(self, job_id): - # """Copied instead of super for the asyncio args""" - # job, timestamp = self._jobs_index.get(job_id, (None, None)) - # if job is None: - # raise JobLookupError(job_id) - # - # index = self._get_job_index(timestamp, job_id) - # del self._jobs[index] - # del self._jobs_index[job.id] - # task = asyncio.create_task(self._async_remove_job(index, job)) - # self._eventloop.run_until_complete(task) - # - # async def _async_remove_job(self, index, job): - # async with self.config.jobs() as jobs: - # del jobs[index] - # # await self.config.jobs_index.clear_raw(job.id) - @run_in_event_loop def remove_all_jobs(self): super().remove_all_jobs() @@ -201,4 +124,5 @@ class RedConfigJobStore(MemoryJobStore): async def async_shutdown(self): await self.save_to_config() - super().remove_all_jobs() + self._jobs = [] + self._jobs_index = {}