Almost working. Date time is only date, figure out what's going on with time.

pull/132/head
bobloy 4 years ago
parent c6a9116a92
commit e602b5c868

@ -1,19 +1,32 @@
import logging
from datetime import datetime, timedelta
from typing import Dict, Union
import discord
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.base import BaseTrigger
from apscheduler.triggers.combining import OrTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from dateutil import parser
from redbot.core import Config, checks, commands
from redbot.core.bot import Red
from redbot.core.commands import DictConverter, TimedeltaConverter, parse_timedelta
from redbot.core.commands import DictConverter, TimedeltaConverter
from .datetimeconverter import DatetimeConverter
from .redconfigjobstore import RedConfigJobStore
log = logging.getLogger("red.fox_v3.fifo")
schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler")
schedule_log.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
async def _execute_task(task_state):
log.info(f"Executing {task_state=}")
task = Task(**task_state)
if await task.load_from_config():
return await task.execute()
return False
def get_trigger(data):
@ -37,11 +50,14 @@ def parse_triggers(data: Union[Dict, None]):
if len(data["triggers"]) > 1: # Multiple triggers
return OrTrigger(get_trigger(t_data) for t_data in data["triggers"])
return get_trigger(data[0])
return get_trigger(data["triggers"][0])
class FakeMessage:
_state = None
# class FakeMessage(discord.Message):
# def __init__(self, *, state, channel, data):
# super().__init__(state=state, channel=channel, data=data)
#
# _state = None
# class FakeMessage(discord.Message):
@ -57,35 +73,47 @@ class Task:
"time_data": None, # Used for Interval and Date Triggers
}
def __init__(self, name: str, guild_id, config: Config, author_id=None, bot: Red = None):
def __init__(
self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None
):
self.name = name
self.guild_id = guild_id
self.config = config
self.bot = bot
self.author_id = author_id
self.channel_id = channel_id
self.data = None
async def _encode_time_data(self):
async def _encode_time_triggers(self):
if not self.data or not self.data.get("triggers", None):
return None
return []
triggers = []
for t in self.data["triggers"]:
if t["type"] == "interval": # Convert into timedelta
td: timedelta = t["time_data"]
triggers.append({"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds} })
triggers.append(
{"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}}
)
continue
if t["type"] == "date": # Convert into datetime
dt: datetime = t["time_data"]
triggers.append({"type": t["type"], "time_data": {
"year": dt.year,
"month": dt.month,
"day": dt.day,
"hour": dt.hour,
"minute": dt.minute,
"second": dt.second,
}})
triggers.append(
{
"type": t["type"],
"time_data": {
"year": dt.year,
"month": dt.month,
"day": dt.day,
"hour": dt.hour,
"minute": dt.minute,
"second": dt.second,
},
}
)
continue
if t["type"] == "cron":
raise NotImplemented
@ -93,16 +121,18 @@ class Task:
return triggers
async def _decode_time_data(self):
async def _decode_time_triggers(self):
if not self.data or not self.data.get("triggers", None):
return
for t in self.data["triggers"]:
for n, t in enumerate(self.data["triggers"]):
if t["type"] == "interval": # Convert into timedelta
t["time_data"] = timedelta(**t["time_data"])
self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"])
continue
if t["type"] == "date": # Convert into datetime
t["time_data"] = datetime(**t["time_data"])
self.data["triggers"][n]["time_data"] = datetime(**t["time_data"])
continue
if t["type"] == "cron":
raise NotImplemented
@ -121,10 +151,11 @@ class Task:
self.author_id = data["author_id"]
self.guild_id = data["guild_id"]
self.channel_id = data["channel_id"]
self.data = data["data"]
await self._decode_time_data()
await self._decode_time_triggers()
return self.data
async def get_trigger(self) -> Union[BaseTrigger, None]:
@ -145,11 +176,12 @@ class Task:
data_to_save = self.default_task_data.copy()
if self.data:
data_to_save["command_str"] = self.data.get("command_str", "")
data_to_save["triggers"] = await self._encode_time_data()
data_to_save["triggers"] = await self._encode_time_triggers()
to_save = {
"guild_id": self.guild_id,
"author_id": self.author_id,
"channel_id": self.channel_id,
"data": data_to_save,
}
await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save)
@ -158,23 +190,54 @@ class Task:
"""To be used when updating triggers"""
if not self.data:
return
data_to_save = self.data.copy()
data_to_save["triggers"] = await self._encode_time_triggers()
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
self.name, "data", value=await self._encode_time_data()
self.name, "data", value=data_to_save
)
async def execute(self):
if not self.data or self.data["command_str"]:
if not self.data or not self.data.get("command_str", False):
log.warning(f"Could not execute task due to data problem: {self.data=}")
return False
message = FakeMessage()
message.guild = self.bot.get_guild(self.guild_id) # used for get_prefix
message.author = message.guild.get_member(self.author_id)
message.content = await self.bot.get_prefix(message) + self.data["command_str"]
guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix
if guild is None:
log.warning(f"Could not execute task due to missing guild: {self.guild_id}")
return False
channel: discord.TextChannel = guild.get_channel(self.channel_id)
if channel is None:
log.warning(f"Could not execute task due to missing channel: {self.channel_id}")
return False
author: discord.User = guild.get_member(self.author_id)
if author is None:
log.warning(f"Could not execute task due to missing author: {self.author_id}")
return False
message = channel.last_message
if message is None:
log.warning("No message found in channel cache yet, skipping execution")
return
message.author = author
prefixes = await self.bot.get_prefix(message)
if isinstance(prefixes, str):
prefix = prefixes
else:
prefix = prefixes[0]
message.content = f"{prefix}{self.data['command_str']}"
if not message.guild or not message.author or not message.content:
log.warning(f"Could not execute task due to message problem: {message}")
return False
new_ctx: commands.Context = await self.bot.get_context(message)
new_ctx.assume_yes = True
if not new_ctx.valid:
log.warning(f"Could not execute task due invalid context: {new_ctx}")
return False
await self.bot.invoke(new_ctx)
@ -203,6 +266,31 @@ class Task:
self.data["triggers"].append(trigger_data)
return True
def __setstate__(self, task_state):
self.name = task_state["name"]
self.guild_id = task_state["guild_id"]
self.config = task_state["config"]
self.bot = None
self.author_id = None
self.channel_id = None
self.data = None
def __getstate__(self):
return {
"name": self.name,
"guild_id": self.guild_id,
"config": self.config,
"bot": self.bot,
}
def _assemble_job_id(task_name, guild_id):
return f"{task_name}_{guild_id}"
def _disassemble_job_id(job_id: str):
return job_id.split("_")
class FIFO(commands.Cog):
"""
@ -222,6 +310,8 @@ class FIFO(commands.Cog):
self.config.register_global(**default_global)
self.config.register_guild(**default_guild)
from .redconfigjobstore import RedConfigJobStore
jobstores = {"default": RedConfigJobStore(self.config, self.bot)}
job_defaults = {"coalesce": False, "max_instances": 1}
@ -229,7 +319,9 @@ class FIFO(commands.Cog):
# executors = {"default": AsyncIOExecutor()}
# Default executor is already AsyncIOExecutor
self.scheduler = AsyncIOScheduler(jobstores=jobstores, job_defaults=job_defaults)
self.scheduler = AsyncIOScheduler(
jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log
)
self.scheduler.start()
@ -237,34 +329,48 @@ class FIFO(commands.Cog):
"""Nothing to delete"""
return
def _assemble_job_id(self, task_name, guild_id):
return task_name + "_" + guild_id
async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str):
message = FakeMessage()
message: discord.Message = ctx.message
message.content = ctx.prefix + command_to_parse
message.author = ctx.author
message.guild = ctx.guild
new_ctx: commands.Context = await self.bot.get_context(message)
return new_ctx.valid
async def _get_job(self, task_name, guild_id):
return self.scheduler.get_job(self._assemble_job_id(task_name, guild_id))
async def _process_task(self, task: Task):
job = await self._get_job(task)
if job is not None:
job.remove()
async def _add_job(self, task):
return await self._add_job(task)
async def _get_job(self, task: Task) -> Job:
return self.scheduler.get_job(_assemble_job_id(task.name, task.guild_id))
async def _add_job(self, task: Task):
return self.scheduler.add_job(
task.execute,
id=self._assemble_job_id(task.name, task.guild_id),
_execute_task,
args=[task.__getstate__()],
id=_assemble_job_id(task.name, task.guild_id),
trigger=await task.get_trigger(),
)
async def _pause_job(self, task: Task):
return self.scheduler.pause_job(job_id=_assemble_job_id(task.name, task.guild_id))
async def _remove_job(self, task: Task):
return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id))
@checks.is_owner()
@commands.command()
async def fifoclear(self, ctx: commands.Context):
"""Debug command to clear fifo config"""
"""Debug command to clear all current fifo data"""
await self.config.guild(ctx.guild).tasks.clear()
await self.config.jobs.clear()
await self.config.jobs_index.clear()
self.scheduler.remove_all_jobs()
await ctx.tick()
@checks.is_owner() # Will be reduced when I figure out permissions later
@ -286,7 +392,15 @@ class FIFO(commands.Cog):
if all_guilds:
pass
else:
pass # TODO: parse and display tasks
out = ""
all_tasks = await self.config.guild(ctx.guild).tasks()
for task_name, task_data in all_tasks.items():
out += f"{task_name}: {task_data}\n"
if out:
await ctx.maybe_send_embed(out)
else:
await ctx.maybe_send_embed("No tasks to list")
@fifo.command(name="add")
async def fifo_add(self, ctx: commands.Context, task_name: str, *, command_to_execute: str):
@ -297,11 +411,17 @@ class FIFO(commands.Cog):
await ctx.maybe_send_embed(f"Task already exists with {task_name=}")
return
if "_" in task_name: # See _disassemble_job_id
await ctx.maybe_send_embed("Task name cannot contain underscores")
return
if not await self._check_parsable_command(ctx, command_to_execute):
await ctx.maybe_send_embed("Failed to parse command. Make sure to include the prefix")
await ctx.maybe_send_embed(
"Failed to parse command. Make sure not to include the prefix"
)
return
task = Task(task_name, ctx.guild.id, self.config, ctx.author.id)
task = Task(task_name, ctx.guild.id, self.config, ctx.author.id, ctx.channel.id, self.bot)
await task.set_commmand_str(command_to_execute)
await task.save_all()
await ctx.tick()
@ -329,7 +449,7 @@ class FIFO(commands.Cog):
Add an interval trigger to the specified task
"""
task = Task(task_name, ctx.guild.id, self.config)
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
await task.load_from_config()
if task.data is None:
@ -345,6 +465,7 @@ class FIFO(commands.Cog):
)
return
await task.save_data()
await self._process_task(task)
await ctx.tick()
@fifo_trigger.command(name="date")
@ -372,6 +493,7 @@ class FIFO(commands.Cog):
return
await task.save_data()
await self._process_task(task)
await ctx.tick()
@fifo_trigger.command(name="cron")

@ -8,6 +8,9 @@
"install_msg": "Thank you for installing FIFO.\nGet started with `[p]load fifo`, then `[p]help FIFO`",
"short": "Schedule commands to be run by certain at certain times or intervals\"",
"end_user_data_statement": "This cog does not store any End User Data",
"requirements": [
"apscheduler"
],
"tags": [
"bobloy",
"utilities",

@ -1,42 +1,96 @@
import asyncio
import base64
import logging
import pickle
from apscheduler.job import Job
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.util import datetime_to_utc_timestamp
from redbot.core import Config
# TODO: use get_lock on config
from redbot.core.bot import Red
log = logging.getLogger("red.fox_v3.fifo.jobstore")
log.setLevel(logging.DEBUG)
save_task_objects = []
class RedConfigJobStore(MemoryJobStore):
def __init__(self, config: Config, bot: Red):
super().__init__()
self.config = config
# nest_asyncio.apply()
self.bot = bot
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop)
async def _load_from_config(self):
self._jobs = await self.config.jobs()
self._jobs = [
(await self._decode_job(job["job_state"]), timestamp)
for (job, timestamp) in self._jobs
]
self._jobs_index = await self.config.jobs_index.all()
def add_job(self, job):
self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
def _encode_job(self, job: Job):
log.info(f"Encoding job id: {job.id}")
job_state = job.__getstate__()
new_args = list(job_state["args"])
new_args[0]["config"] = None
new_args[0]["bot"] = None
job_state["args"] = tuple(new_args)
encoded = base64.b64encode(pickle.dumps(job_state, self.pickle_protocol))
out = {
"_id": job.id,
"next_run_time": datetime_to_utc_timestamp(job.next_run_time),
"job_state": encoded.decode("ascii"),
}
new_args = list(job_state["args"])
new_args[0]["config"] = self.config
new_args[0]["bot"] = self.bot
job_state["args"] = tuple(new_args)
log.info(f"After encode: Check job args: {job.args=}")
return out
async def _decode_job(self, job_state):
job_state = pickle.loads(base64.b64decode(job_state))
new_args = list(job_state["args"])
new_args[0]["config"] = self.config
new_args[0]["bot"] = self.bot
job_state["args"] = tuple(new_args)
job = Job.__new__(Job)
job.__setstate__(job_state)
job._scheduler = self._scheduler
job._jobstore_alias = self._alias
# task_name, guild_id = _disassemble_job_id(job.id)
# task = Task(task_name, guild_id, self.config)
# await task.load_from_config()
# save_task_objects.append(task)
#
# job.func = task.execute
log.info(f"Decoded job id: {job.id}")
return job
def add_job(self, job: Job):
if job.id in self._jobs_index:
raise ConflictingIdError(job.id)
log.info(f"Check job args: {job.args=}")
timestamp = datetime_to_utc_timestamp(job.next_run_time)
index = self._get_job_index(timestamp, job.id) # This is fine
self._jobs.insert(index, (job, timestamp))
self._jobs_index[job.id] = (job, timestamp)
asyncio.create_task(self._async_add_job(job, index, timestamp))
log.info(f"Added job: {self._jobs[index][0].args}")
async def _async_add_job(self, job, index, timestamp):
async with self.config.jobs() as jobs:
jobs.insert(index, (job, timestamp))
await self.config.jobs_index.set_raw(job.id, value=(job, timestamp))
jobs.insert(index, (self._encode_job(job), timestamp))
await self.config.jobs_index.set_raw(job.id, value=(self._encode_job(job), timestamp))
return True
def update_job(self, job):
@ -48,22 +102,29 @@ class RedConfigJobStore(MemoryJobStore):
# Otherwise, reinsert the job to the list to preserve the ordering.
old_index = self._get_job_index(old_timestamp, old_job.id)
new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
asyncio.ensure_future(self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp), loop=self.bot.loop)
asyncio.ensure_future(
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp),
loop=self.bot.loop,
)
async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
encoded_job = self._encode_job(job)
if old_timestamp == new_timestamp:
self._jobs[old_index] = (job, new_timestamp)
async with self.config.jobs() as jobs:
jobs[old_index] = (job, new_timestamp)
jobs[old_index] = (encoded_job, new_timestamp)
else:
del self._jobs[old_index]
new_index = self._get_job_index(new_timestamp, job.id) # This is fine
self._jobs.insert(new_index, (job, new_timestamp))
async with self.config.jobs() as jobs:
del jobs[old_index]
jobs.insert(new_index, (job, new_timestamp))
jobs.insert(new_index, (encoded_job, new_timestamp))
self._jobs_index[old_job.id] = (job, new_timestamp)
await self.config.jobs_index.set_raw(old_job.id, value=(job, new_timestamp))
await self.config.jobs_index.set_raw(old_job.id, value=(encoded_job, new_timestamp))
log.info(f"Async Updated {job.id=}")
log.info(f"Check job args: {job.args=}")
def remove_job(self, job_id):
job, timestamp = self._jobs_index.get(job_id, (None, None))

@ -0,0 +1,44 @@
import six
from apscheduler.job import Job
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.schedulers.base import STATE_STOPPED
from apscheduler.util import undefined
class RedJob(Job):
pass
class RedAsyncIOScheduler(AsyncIOScheduler):
def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
next_run_time=undefined, jobstore='default', executor='default',
replace_existing=False, **trigger_args):
job_kwargs = {
'trigger': self._create_trigger(trigger, trigger_args),
'executor': executor,
'func': func,
'args': tuple(args) if args is not None else (),
'kwargs': dict(kwargs) if kwargs is not None else {},
'id': id,
'name': name,
'misfire_grace_time': misfire_grace_time,
'coalesce': coalesce,
'max_instances': max_instances,
'next_run_time': next_run_time
}
job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
value is not undefined)
job = RedJob(self, **job_kwargs)
# Don't really add jobs to job stores before the scheduler is up and running
with self._jobstores_lock:
if self.state == STATE_STOPPED:
self._pending_jobs.append((job, jobstore, replace_existing))
self._logger.info('Adding job tentatively -- it will be properly scheduled when '
'the scheduler starts')
else:
self._real_add_job(job, jobstore, replace_existing)
return job
Loading…
Cancel
Save