Start of Cron, fixed jobstore, add pause and resume, split task,
This commit is contained in:
parent
f24183d4f2
commit
e1d314cc83
@ -4,3 +4,8 @@ from .fifo import FIFO
|
||||
async def setup(bot):
|
||||
cog = FIFO(bot)
|
||||
bot.add_cog(cog)
|
||||
await cog.initialize()
|
||||
|
||||
|
||||
def teardown(bot):
|
||||
pass
|
||||
|
@ -1,6 +1,7 @@
|
||||
from datetime import datetime
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from discord.ext.commands import BadArgument, Converter
|
||||
from dateutil import parser
|
||||
|
||||
@ -8,6 +9,7 @@ from fifo.timezones import assemble_timezones
|
||||
|
||||
if TYPE_CHECKING:
|
||||
DatetimeConverter = datetime
|
||||
CronConverter = str
|
||||
else:
|
||||
class DatetimeConverter(Converter):
|
||||
async def convert(self, ctx, argument) -> datetime:
|
||||
@ -15,3 +17,12 @@ else:
|
||||
if dt is not None:
|
||||
return dt
|
||||
raise BadArgument()
|
||||
|
||||
class CronConverter(Converter):
|
||||
async def convert(self, ctx, argument) -> str:
|
||||
try:
|
||||
CronTrigger.from_crontab(argument)
|
||||
except ValueError:
|
||||
raise BadArgument()
|
||||
|
||||
return argument
|
417
fifo/fifo.py
417
fifo/fifo.py
@ -1,31 +1,23 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Optional, Union
|
||||
from typing import Optional, Union
|
||||
|
||||
import discord
|
||||
from apscheduler.job import Job
|
||||
from apscheduler.jobstores.base import JobLookupError
|
||||
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
||||
from apscheduler.schedulers.base import STATE_PAUSED, STATE_RUNNING
|
||||
from apscheduler.triggers.base import BaseTrigger
|
||||
from apscheduler.triggers.combining import OrTrigger
|
||||
from apscheduler.triggers.date import DateTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from discord.utils import time_snowflake
|
||||
from redbot.core import Config, checks, commands
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.commands import DictConverter, TimedeltaConverter
|
||||
|
||||
from .datetimeconverter import DatetimeConverter
|
||||
from .datetime_cron_converters import CronConverter, DatetimeConverter
|
||||
from .task import Task
|
||||
|
||||
log = logging.getLogger("red.fox_v3.fifo")
|
||||
schedule_log = logging.getLogger("red.fox_v3.fifo.scheduler")
|
||||
schedule_log.setLevel(logging.DEBUG)
|
||||
log.setLevel(logging.DEBUG)
|
||||
|
||||
|
||||
async def _do_nothing(*args, **kwargs):
|
||||
pass
|
||||
log = logging.getLogger("red.fox_v3.fifo")
|
||||
|
||||
|
||||
async def _execute_task(task_state):
|
||||
@ -36,279 +28,6 @@ async def _execute_task(task_state):
|
||||
return False
|
||||
|
||||
|
||||
def get_trigger(data):
|
||||
if data["type"] == "interval":
|
||||
parsed_time = data["time_data"]
|
||||
return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds)
|
||||
|
||||
if data["type"] == "date":
|
||||
return DateTrigger(data["time_data"])
|
||||
|
||||
if data["type"] == "cron":
|
||||
return None # TODO: Cron parsing
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def parse_triggers(data: Union[Dict, None]):
|
||||
if data is None or not data.get("triggers", False): # No triggers
|
||||
return None
|
||||
|
||||
if len(data["triggers"]) > 1: # Multiple triggers
|
||||
return OrTrigger(get_trigger(t_data) for t_data in data["triggers"])
|
||||
|
||||
return get_trigger(data["triggers"][0])
|
||||
|
||||
|
||||
class FakeMessage2(discord.Message):
|
||||
__slots__ = ("__dict__",)
|
||||
|
||||
|
||||
class FakeMessage:
|
||||
def __init__(self, message: discord.Message):
|
||||
d = {k: getattr(message, k, None) for k in dir(message)}
|
||||
self.__dict__.update(**d)
|
||||
|
||||
|
||||
class Task:
|
||||
default_task_data = {"triggers": [], "command_str": ""}
|
||||
|
||||
default_trigger = {
|
||||
"type": "",
|
||||
"time_data": None, # Used for Interval and Date Triggers
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None
|
||||
):
|
||||
self.name = name
|
||||
self.guild_id = guild_id
|
||||
self.config = config
|
||||
self.bot = bot
|
||||
self.author_id = author_id
|
||||
self.channel_id = channel_id
|
||||
self.data = None
|
||||
|
||||
async def _encode_time_triggers(self):
|
||||
if not self.data or not self.data.get("triggers", None):
|
||||
return []
|
||||
|
||||
triggers = []
|
||||
for t in self.data["triggers"]:
|
||||
if t["type"] == "interval": # Convert into timedelta
|
||||
td: timedelta = t["time_data"]
|
||||
|
||||
triggers.append(
|
||||
{"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}}
|
||||
)
|
||||
continue
|
||||
|
||||
if t["type"] == "date": # Convert into datetime
|
||||
dt: datetime = t["time_data"]
|
||||
triggers.append({"type": t["type"], "time_data": dt.isoformat()})
|
||||
# triggers.append(
|
||||
# {
|
||||
# "type": t["type"],
|
||||
# "time_data": {
|
||||
# "year": dt.year,
|
||||
# "month": dt.month,
|
||||
# "day": dt.day,
|
||||
# "hour": dt.hour,
|
||||
# "minute": dt.minute,
|
||||
# "second": dt.second,
|
||||
# "tzinfo": dt.tzinfo,
|
||||
# },
|
||||
# }
|
||||
# )
|
||||
continue
|
||||
|
||||
if t["type"] == "cron":
|
||||
raise NotImplemented
|
||||
raise NotImplemented
|
||||
|
||||
return triggers
|
||||
|
||||
async def _decode_time_triggers(self):
|
||||
if not self.data or not self.data.get("triggers", None):
|
||||
return
|
||||
|
||||
for n, t in enumerate(self.data["triggers"]):
|
||||
if t["type"] == "interval": # Convert into timedelta
|
||||
self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"])
|
||||
continue
|
||||
|
||||
if t["type"] == "date": # Convert into datetime
|
||||
# self.data["triggers"][n]["time_data"] = datetime(**t["time_data"])
|
||||
self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"])
|
||||
continue
|
||||
|
||||
if t["type"] == "cron":
|
||||
raise NotImplemented
|
||||
raise NotImplemented
|
||||
|
||||
# async def load_from_data(self, data: Dict):
|
||||
# self.data = data.copy()
|
||||
|
||||
async def load_from_config(self):
|
||||
data = await self.config.guild_from_id(self.guild_id).tasks.get_raw(
|
||||
self.name, default=None
|
||||
)
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
self.author_id = data["author_id"]
|
||||
self.guild_id = data["guild_id"]
|
||||
self.channel_id = data["channel_id"]
|
||||
|
||||
self.data = data["data"]
|
||||
|
||||
await self._decode_time_triggers()
|
||||
return self.data
|
||||
|
||||
async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]:
|
||||
if not self.data:
|
||||
await self.load_from_config()
|
||||
|
||||
if self.data is None or "triggers" not in self.data: # No triggers
|
||||
return []
|
||||
|
||||
return [get_trigger(t) for t in self.data["triggers"]]
|
||||
|
||||
async def get_combined_trigger(self) -> Union[BaseTrigger, None]:
|
||||
if not self.data:
|
||||
await self.load_from_config()
|
||||
|
||||
return parse_triggers(self.data)
|
||||
|
||||
# async def set_job_id(self, job_id):
|
||||
# if self.data is None:
|
||||
# await self.load_from_config()
|
||||
#
|
||||
# self.data["job_id"] = job_id
|
||||
|
||||
async def save_all(self):
|
||||
"""To be used when creating an new task"""
|
||||
|
||||
data_to_save = self.default_task_data.copy()
|
||||
if self.data:
|
||||
data_to_save["command_str"] = self.get_command_str()
|
||||
data_to_save["triggers"] = await self._encode_time_triggers()
|
||||
|
||||
to_save = {
|
||||
"guild_id": self.guild_id,
|
||||
"author_id": self.author_id,
|
||||
"channel_id": self.channel_id,
|
||||
"data": data_to_save,
|
||||
}
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save)
|
||||
|
||||
async def save_data(self):
|
||||
"""To be used when updating triggers"""
|
||||
if not self.data:
|
||||
return
|
||||
|
||||
data_to_save = self.data.copy()
|
||||
data_to_save["triggers"] = await self._encode_time_triggers()
|
||||
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
|
||||
self.name, "data", value=data_to_save
|
||||
)
|
||||
|
||||
async def execute(self):
|
||||
if not self.data or not self.get_command_str():
|
||||
log.warning(f"Could not execute task due to data problem: {self.data=}")
|
||||
return False
|
||||
|
||||
guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix
|
||||
if guild is None:
|
||||
log.warning(f"Could not execute task due to missing guild: {self.guild_id}")
|
||||
return False
|
||||
channel: discord.TextChannel = guild.get_channel(self.channel_id)
|
||||
if channel is None:
|
||||
log.warning(f"Could not execute task due to missing channel: {self.channel_id}")
|
||||
return False
|
||||
author: discord.User = guild.get_member(self.author_id)
|
||||
if author is None:
|
||||
log.warning(f"Could not execute task due to missing author: {self.author_id}")
|
||||
return False
|
||||
|
||||
actual_message: discord.Message = channel.last_message
|
||||
if actual_message is None:
|
||||
log.warning("No message found in channel cache yet, skipping execution")
|
||||
return
|
||||
|
||||
message = FakeMessage(actual_message)
|
||||
# message = FakeMessage2
|
||||
message.author = author
|
||||
message.id = time_snowflake(datetime.now()) # Pretend to be now
|
||||
message.add_reaction = _do_nothing
|
||||
|
||||
prefixes = await self.bot.get_prefix(message)
|
||||
if isinstance(prefixes, str):
|
||||
prefix = prefixes
|
||||
else:
|
||||
prefix = prefixes[0]
|
||||
|
||||
message.content = f"{prefix}{self.get_command_str()}"
|
||||
|
||||
if not message.guild or not message.author or not message.content:
|
||||
log.warning(f"Could not execute task due to message problem: {message}")
|
||||
return False
|
||||
|
||||
new_ctx: commands.Context = await self.bot.get_context(message)
|
||||
new_ctx.assume_yes = True
|
||||
if not new_ctx.valid:
|
||||
log.warning(f"Could not execute task due invalid context: {new_ctx}")
|
||||
return False
|
||||
|
||||
await self.bot.invoke(new_ctx)
|
||||
return True
|
||||
|
||||
async def set_bot(self, bot: Red):
|
||||
self.bot = bot
|
||||
|
||||
async def set_author(self, author: Union[discord.User, str]):
|
||||
self.author_id = getattr(author, "id", None) or author
|
||||
|
||||
def get_command_str(self):
|
||||
return self.data.get("command_str", "")
|
||||
|
||||
async def set_commmand_str(self, command_str):
|
||||
if not self.data:
|
||||
self.data = self.default_task_data.copy()
|
||||
self.data["command_str"] = command_str
|
||||
return True
|
||||
|
||||
async def add_trigger(self, param, parsed_time: Union[timedelta, datetime]):
|
||||
trigger_data = {"type": param, "time_data": parsed_time}
|
||||
if not get_trigger(trigger_data):
|
||||
return False
|
||||
|
||||
if not self.data:
|
||||
self.data = self.default_task_data.copy()
|
||||
|
||||
self.data["triggers"].append(trigger_data)
|
||||
return True
|
||||
|
||||
def __setstate__(self, task_state):
|
||||
self.name = task_state["name"]
|
||||
self.guild_id = task_state["guild_id"]
|
||||
self.config = task_state["config"]
|
||||
self.bot = None
|
||||
self.author_id = None
|
||||
self.channel_id = None
|
||||
self.data = None
|
||||
|
||||
def __getstate__(self):
|
||||
return {
|
||||
"name": self.name,
|
||||
"guild_id": self.guild_id,
|
||||
"config": self.config,
|
||||
"bot": self.bot,
|
||||
}
|
||||
|
||||
|
||||
def _assemble_job_id(task_name, guild_id):
|
||||
return f"{task_name}_{guild_id}"
|
||||
|
||||
@ -335,20 +54,8 @@ class FIFO(commands.Cog):
|
||||
self.config.register_global(**default_global)
|
||||
self.config.register_guild(**default_guild)
|
||||
|
||||
from .redconfigjobstore import RedConfigJobStore
|
||||
|
||||
jobstores = {"default": RedConfigJobStore(self.config, self.bot)}
|
||||
|
||||
job_defaults = {"coalesce": False, "max_instances": 1}
|
||||
|
||||
# executors = {"default": AsyncIOExecutor()}
|
||||
|
||||
# Default executor is already AsyncIOExecutor
|
||||
self.scheduler = AsyncIOScheduler(
|
||||
jobstores=jobstores, job_defaults=job_defaults, logger=schedule_log
|
||||
)
|
||||
|
||||
self.scheduler.start() # TODO: Jobs are not receiving next_run_times
|
||||
self.scheduler = None
|
||||
self.jobstore = None
|
||||
|
||||
async def red_delete_data_for_user(self, **kwargs):
|
||||
"""Nothing to delete"""
|
||||
@ -356,7 +63,25 @@ class FIFO(commands.Cog):
|
||||
|
||||
def cog_unload(self):
|
||||
# self.scheduler.remove_all_jobs()
|
||||
self.scheduler.shutdown()
|
||||
if self.scheduler is not None:
|
||||
self.scheduler.shutdown()
|
||||
|
||||
async def initialize(self):
|
||||
|
||||
job_defaults = {"coalesce": False, "max_instances": 1}
|
||||
|
||||
# executors = {"default": AsyncIOExecutor()}
|
||||
|
||||
# Default executor is already AsyncIOExecutor
|
||||
self.scheduler = AsyncIOScheduler(job_defaults=job_defaults, logger=schedule_log)
|
||||
|
||||
from .redconfigjobstore import RedConfigJobStore
|
||||
|
||||
self.jobstore = RedConfigJobStore(self.config, self.bot)
|
||||
await self.jobstore.load_from_config(self.scheduler, "default")
|
||||
self.scheduler.add_jobstore(self.jobstore, "default")
|
||||
|
||||
self.scheduler.start() # TODO: Jobs are not receiving next_run_times
|
||||
|
||||
async def _check_parsable_command(self, ctx: commands.Context, command_to_parse: str):
|
||||
message: discord.Message = ctx.message
|
||||
@ -400,6 +125,7 @@ class FIFO(commands.Cog):
|
||||
return self.scheduler.remove_job(job_id=_assemble_job_id(task.name, task.guild_id))
|
||||
|
||||
@checks.is_owner()
|
||||
@commands.guild_only()
|
||||
@commands.command()
|
||||
async def fifoclear(self, ctx: commands.Context):
|
||||
"""Debug command to clear all current fifo data"""
|
||||
@ -410,6 +136,7 @@ class FIFO(commands.Cog):
|
||||
await ctx.tick()
|
||||
|
||||
@checks.is_owner() # Will be reduced when I figure out permissions later
|
||||
@commands.guild_only()
|
||||
@commands.group()
|
||||
async def fifo(self, ctx: commands.Context):
|
||||
"""
|
||||
@ -418,8 +145,46 @@ class FIFO(commands.Cog):
|
||||
if ctx.invoked_subcommand is None:
|
||||
pass
|
||||
|
||||
@fifo.command(name="set")
|
||||
async def fifo_setauthor(self, ctx: commands.Context, task_name: str, author_or_channel: Union[discord.Member, discord.TextChannel]):
|
||||
"""
|
||||
Sets the task to be executed as a different author or in a different channel.
|
||||
"""
|
||||
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
|
||||
await task.load_from_config()
|
||||
|
||||
if task.data is None:
|
||||
await ctx.maybe_send_embed(
|
||||
f"Task by the name of {task_name} is not found in this guild"
|
||||
)
|
||||
return
|
||||
|
||||
if isinstance(author_or_channel, discord.Member):
|
||||
if task.author_id == author_or_channel.id:
|
||||
await ctx.maybe_send_embed("Already executing as that member")
|
||||
return
|
||||
|
||||
await task.set_author(author_or_channel) # also saves
|
||||
elif isinstance(author_or_channel, discord.TextChannel):
|
||||
if task.channel_id == author_or_channel.id:
|
||||
await ctx.maybe_send_embed("Already executing in that channel")
|
||||
return
|
||||
|
||||
await task.set_channel(author_or_channel)
|
||||
else:
|
||||
await ctx.maybe_send_embed("Unsupported result")
|
||||
return
|
||||
|
||||
await ctx.tick()
|
||||
|
||||
@fifo.command(name="resume")
|
||||
async def fifo_resume(self, ctx: commands.Context, task_name: Optional[str] = None):
|
||||
"""
|
||||
Provide a task name to resume execution of a task.
|
||||
|
||||
Otherwise resumes execution of all tasks on all guilds
|
||||
If the task isn't currently scheduled, will schedule it
|
||||
"""
|
||||
if task_name is None:
|
||||
if self.scheduler.state == STATE_PAUSED:
|
||||
self.scheduler.resume()
|
||||
@ -443,6 +208,11 @@ class FIFO(commands.Cog):
|
||||
|
||||
@fifo.command(name="pause")
|
||||
async def fifo_pause(self, ctx: commands.Context, task_name: Optional[str] = None):
|
||||
"""
|
||||
Provide a task name to pause execution of a task
|
||||
|
||||
Otherwise pauses execution of all tasks on all guilds
|
||||
"""
|
||||
if task_name is None:
|
||||
if self.scheduler.state == STATE_RUNNING:
|
||||
self.scheduler.pause()
|
||||
@ -468,7 +238,6 @@ class FIFO(commands.Cog):
|
||||
async def fifo_details(self, ctx: commands.Context, task_name: str):
|
||||
"""
|
||||
Provide all the details on the specified task name
|
||||
|
||||
"""
|
||||
task = Task(task_name, ctx.guild.id, self.config, bot=self.bot)
|
||||
await task.load_from_config()
|
||||
@ -633,33 +402,31 @@ class FIFO(commands.Cog):
|
||||
|
||||
@fifo_trigger.command(name="cron")
|
||||
async def fifo_trigger_cron(
|
||||
self, ctx: commands.Context, task_name: str, cron_settings: DictConverter
|
||||
self, ctx: commands.Context, task_name: str, *, cron_str: CronConverter
|
||||
):
|
||||
"""
|
||||
Add a "time of day" trigger to the specified task
|
||||
"""
|
||||
await ctx.maybe_send_embed("Not yet implemented")
|
||||
task = Task(task_name, ctx.guild.id, self.config)
|
||||
await task.load_from_config()
|
||||
|
||||
# async def load_tasks(self):
|
||||
# """
|
||||
# Run once on cog load.
|
||||
# """
|
||||
# all_guilds = await self.config.all_guilds()
|
||||
# async for guild_id, guild_data in AsyncIter(all_guilds["tasks"].items(), steps=100):
|
||||
# for task_name, task_data in guild_data["tasks"].items():
|
||||
# task = Task(task_name, guild_id, self.config)
|
||||
# await task.load_from_data(task_data)
|
||||
#
|
||||
# job = self.scheduler.add_job(
|
||||
# task.execute, id=task_name + "_" + guild_id, trigger=await task.get_combined_trigger(),
|
||||
# )
|
||||
#
|
||||
# self.scheduler.start()
|
||||
if task.data is None:
|
||||
await ctx.maybe_send_embed(
|
||||
f"Task by the name of {task_name} is not found in this guild"
|
||||
)
|
||||
return
|
||||
|
||||
# async def parent_loop(self):
|
||||
# await asyncio.sleep(60)
|
||||
# asyncio.create_task(self.process_tasks(datetime.datetime.utcnow()))
|
||||
#
|
||||
# async def process_tasks(self, now: datetime.datetime):
|
||||
# for task in self.tasks:
|
||||
# pass
|
||||
result = await task.add_trigger("cron", cron_str)
|
||||
if not result:
|
||||
await ctx.maybe_send_embed(
|
||||
"Failed to add a cron trigger to this task, see console for logs"
|
||||
)
|
||||
return
|
||||
|
||||
await task.save_data()
|
||||
job: Job = await self._process_task(task)
|
||||
delta_from_now: timedelta = job.next_run_time - datetime.now(job.next_run_time.tzinfo)
|
||||
await ctx.maybe_send_embed(
|
||||
f"Task `{task_name}` added cron_str to its scheduled runtimes\n"
|
||||
f"Next run time: {job.next_run_time} ({delta_from_now.total_seconds()} seconds)"
|
||||
)
|
||||
|
@ -3,16 +3,19 @@ import base64
|
||||
import logging
|
||||
import pickle
|
||||
from datetime import datetime
|
||||
from time import sleep
|
||||
from typing import Tuple, Union
|
||||
|
||||
from apscheduler.job import Job
|
||||
from apscheduler.jobstores.base import ConflictingIdError, JobLookupError
|
||||
from apscheduler.jobstores.memory import MemoryJobStore
|
||||
from apscheduler.schedulers.asyncio import run_in_event_loop
|
||||
from apscheduler.util import datetime_to_utc_timestamp
|
||||
from redbot.core import Config
|
||||
|
||||
# TODO: use get_lock on config
|
||||
from redbot.core.bot import Red
|
||||
from redbot.core.utils import AsyncIter
|
||||
|
||||
log = logging.getLogger("red.fox_v3.fifo.jobstore")
|
||||
log.setLevel(logging.DEBUG)
|
||||
@ -26,19 +29,29 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
self.config = config
|
||||
self.bot = bot
|
||||
self.pickle_protocol = pickle.HIGHEST_PROTOCOL
|
||||
asyncio.ensure_future(self._load_from_config(), loop=self.bot.loop)
|
||||
self._eventloop = self.bot.loop
|
||||
# TODO: self.config.jobs_index is never read from,
|
||||
# either remove or replace self._jobs_index
|
||||
|
||||
async def _load_from_config(self):
|
||||
self._jobs = await self.config.jobs()
|
||||
# task = asyncio.create_task(self.load_from_config())
|
||||
# while not task.done():
|
||||
# sleep(0.1)
|
||||
# future = asyncio.ensure_future(self.load_from_config(), loop=self.bot.loop)
|
||||
|
||||
@run_in_event_loop
|
||||
def start(self, scheduler, alias):
|
||||
super().start(scheduler, alias)
|
||||
|
||||
async def load_from_config(self, scheduler, alias):
|
||||
super().start(scheduler, alias)
|
||||
_jobs = await self.config.jobs()
|
||||
self._jobs = [
|
||||
(await self._decode_job(job["job_state"]), timestamp)
|
||||
for (job, timestamp) in self._jobs
|
||||
(await self._decode_job(job), timestamp) async for (job, timestamp) in AsyncIter(_jobs)
|
||||
]
|
||||
self._jobs_index = await self.config.jobs_index.all()
|
||||
# self._jobs_index = await self.config.jobs_index.all() # Overwritten by next
|
||||
self._jobs_index = {job.id: (job, timestamp) for job, timestamp in self._jobs}
|
||||
|
||||
def _encode_job(self, job: Job):
|
||||
# log.debug(f"Encoding job id: {job.id}")
|
||||
job_state = job.__getstate__()
|
||||
new_args = list(job_state["args"])
|
||||
new_args[0]["config"] = None
|
||||
@ -54,9 +67,15 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
new_args[0]["config"] = self.config
|
||||
new_args[0]["bot"] = self.bot
|
||||
job_state["args"] = tuple(new_args)
|
||||
# log.debug(f"Encoding job id: {job.id}\n"
|
||||
# f"Encoded as: {out}")
|
||||
|
||||
return out
|
||||
|
||||
async def _decode_job(self, job_state):
|
||||
async def _decode_job(self, in_job):
|
||||
if in_job is None:
|
||||
return None
|
||||
job_state = in_job["job_state"]
|
||||
job_state = pickle.loads(base64.b64decode(job_state))
|
||||
new_args = list(job_state["args"])
|
||||
new_args[0]["config"] = self.config
|
||||
@ -73,10 +92,12 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
#
|
||||
# job.func = task.execute
|
||||
|
||||
# log.debug(f"Decoded job id: {job.id}")
|
||||
# log.debug(f"Decoded job id: {job.id}\n"
|
||||
# f"Decoded as {job_state}")
|
||||
|
||||
return job
|
||||
|
||||
@run_in_event_loop
|
||||
def add_job(self, job: Job):
|
||||
if job.id in self._jobs_index:
|
||||
raise ConflictingIdError(job.id)
|
||||
@ -89,11 +110,14 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
# log.debug(f"Added job: {self._jobs[index][0].args}")
|
||||
|
||||
async def _async_add_job(self, job, index, timestamp):
|
||||
encoded_job = self._encode_job(job)
|
||||
job_tuple = tuple([encoded_job, timestamp])
|
||||
async with self.config.jobs() as jobs:
|
||||
jobs.insert(index, (self._encode_job(job), timestamp))
|
||||
await self.config.jobs_index.set_raw(job.id, value=(self._encode_job(job), timestamp))
|
||||
jobs.insert(index, job_tuple)
|
||||
await self.config.jobs_index.set_raw(job.id, value=job_tuple)
|
||||
return True
|
||||
|
||||
@run_in_event_loop
|
||||
def update_job(self, job):
|
||||
old_tuple: Tuple[Union[Job, None], Union[datetime, None]] = self._jobs_index.get(
|
||||
job.id, (None, None)
|
||||
@ -107,9 +131,8 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
# Otherwise, reinsert the job to the list to preserve the ordering.
|
||||
old_index = self._get_job_index(old_timestamp, old_job.id)
|
||||
new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
|
||||
asyncio.ensure_future(
|
||||
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp),
|
||||
loop=self.bot.loop,
|
||||
asyncio.create_task(
|
||||
self._async_update_job(job, new_timestamp, old_index, old_job, old_timestamp)
|
||||
)
|
||||
|
||||
async def _async_update_job(self, job, new_timestamp, old_index, old_job, old_timestamp):
|
||||
@ -131,6 +154,7 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
log.debug(f"Async Updated {job.id=}")
|
||||
log.debug(f"Check job args: {job.args=}")
|
||||
|
||||
@run_in_event_loop
|
||||
def remove_job(self, job_id):
|
||||
job, timestamp = self._jobs_index.get(job_id, (None, None))
|
||||
if job is None:
|
||||
@ -146,6 +170,7 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
del jobs[index]
|
||||
await self.config.jobs_index.clear_raw(job.id)
|
||||
|
||||
@run_in_event_loop
|
||||
def remove_all_jobs(self):
|
||||
super().remove_all_jobs()
|
||||
asyncio.create_task(self._async_remove_all_jobs())
|
||||
@ -154,6 +179,10 @@ class RedConfigJobStore(MemoryJobStore):
|
||||
await self.config.jobs.clear()
|
||||
await self.config.jobs_index.clear()
|
||||
|
||||
def shutdown(self):
|
||||
"""Removes all jobs without clearing config"""
|
||||
super().remove_all_jobs()
|
||||
|
||||
|
||||
# import asyncio
|
||||
#
|
||||
|
325
fifo/task.py
Normal file
325
fifo/task.py
Normal file
@ -0,0 +1,325 @@
|
||||
import logging
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, List, Union
|
||||
|
||||
import discord
|
||||
from apscheduler.triggers.base import BaseTrigger
|
||||
from apscheduler.triggers.combining import OrTrigger
|
||||
from apscheduler.triggers.date import DateTrigger
|
||||
from apscheduler.triggers.interval import IntervalTrigger
|
||||
from apscheduler.triggers.cron import CronTrigger
|
||||
from discord.utils import time_snowflake
|
||||
from redbot.core import Config, commands
|
||||
from redbot.core.bot import Red
|
||||
|
||||
log = logging.getLogger("red.fox_v3.fifo.task")
|
||||
|
||||
|
||||
async def _do_nothing(*args, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
def get_trigger(data):
|
||||
if data["type"] == "interval":
|
||||
parsed_time = data["time_data"]
|
||||
return IntervalTrigger(days=parsed_time.days, seconds=parsed_time.seconds)
|
||||
|
||||
if data["type"] == "date":
|
||||
return DateTrigger(data["time_data"])
|
||||
|
||||
if data["type"] == "cron":
|
||||
return CronTrigger.from_crontab(data["time_data"])
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def parse_triggers(data: Union[Dict, None]):
|
||||
if data is None or not data.get("triggers", False): # No triggers
|
||||
return None
|
||||
|
||||
if len(data["triggers"]) > 1: # Multiple triggers
|
||||
return OrTrigger(get_trigger(t_data) for t_data in data["triggers"])
|
||||
|
||||
return get_trigger(data["triggers"][0])
|
||||
|
||||
|
||||
class FakeMessage:
|
||||
def __init__(self, message: discord.Message):
|
||||
d = {k: getattr(message, k, None) for k in dir(message)}
|
||||
self.__dict__.update(**d)
|
||||
|
||||
|
||||
def neuter_message(message: FakeMessage):
|
||||
message.delete = _do_nothing
|
||||
message.edit = _do_nothing
|
||||
message.publish = _do_nothing
|
||||
message.pin = _do_nothing
|
||||
message.unpin = _do_nothing
|
||||
message.add_reaction = _do_nothing
|
||||
message.remove_reaction = _do_nothing
|
||||
message.clear_reaction = _do_nothing
|
||||
message.clear_reactions = _do_nothing
|
||||
message.ack = _do_nothing
|
||||
|
||||
return message
|
||||
|
||||
|
||||
class Task:
|
||||
default_task_data = {"triggers": [], "command_str": ""}
|
||||
|
||||
default_trigger = {
|
||||
"type": "",
|
||||
"time_data": None, # Used for Interval and Date Triggers
|
||||
}
|
||||
|
||||
def __init__(
|
||||
self, name: str, guild_id, config: Config, author_id=None, channel_id=None, bot: Red = None
|
||||
):
|
||||
self.name = name
|
||||
self.guild_id = guild_id
|
||||
self.config = config
|
||||
self.bot = bot
|
||||
self.author_id = author_id
|
||||
self.channel_id = channel_id
|
||||
self.data = None
|
||||
|
||||
async def _encode_time_triggers(self):
|
||||
if not self.data or not self.data.get("triggers", None):
|
||||
return []
|
||||
|
||||
triggers = []
|
||||
for t in self.data["triggers"]:
|
||||
if t["type"] == "interval": # Convert into timedelta
|
||||
td: timedelta = t["time_data"]
|
||||
|
||||
triggers.append(
|
||||
{"type": t["type"], "time_data": {"days": td.days, "seconds": td.seconds}}
|
||||
)
|
||||
continue
|
||||
|
||||
if t["type"] == "date": # Convert into datetime
|
||||
dt: datetime = t["time_data"]
|
||||
triggers.append({"type": t["type"], "time_data": dt.isoformat()})
|
||||
# triggers.append(
|
||||
# {
|
||||
# "type": t["type"],
|
||||
# "time_data": {
|
||||
# "year": dt.year,
|
||||
# "month": dt.month,
|
||||
# "day": dt.day,
|
||||
# "hour": dt.hour,
|
||||
# "minute": dt.minute,
|
||||
# "second": dt.second,
|
||||
# "tzinfo": dt.tzinfo,
|
||||
# },
|
||||
# }
|
||||
# )
|
||||
continue
|
||||
|
||||
if t["type"] == "cron": # TODO: Implement this, should be easy
|
||||
raise NotImplemented
|
||||
raise NotImplemented
|
||||
|
||||
return triggers
|
||||
|
||||
async def _decode_time_triggers(self):
|
||||
if not self.data or not self.data.get("triggers", None):
|
||||
return
|
||||
|
||||
for n, t in enumerate(self.data["triggers"]):
|
||||
if t["type"] == "interval": # Convert into timedelta
|
||||
self.data["triggers"][n]["time_data"] = timedelta(**t["time_data"])
|
||||
continue
|
||||
|
||||
if t["type"] == "date": # Convert into datetime
|
||||
# self.data["triggers"][n]["time_data"] = datetime(**t["time_data"])
|
||||
self.data["triggers"][n]["time_data"] = datetime.fromisoformat(t["time_data"])
|
||||
continue
|
||||
|
||||
if t["type"] == "cron":
|
||||
raise NotImplemented
|
||||
raise NotImplemented
|
||||
|
||||
# async def load_from_data(self, data: Dict):
|
||||
# self.data = data.copy()
|
||||
|
||||
async def load_from_config(self):
|
||||
data = await self.config.guild_from_id(self.guild_id).tasks.get_raw(
|
||||
self.name, default=None
|
||||
)
|
||||
|
||||
if not data:
|
||||
return
|
||||
|
||||
self.author_id = data["author_id"]
|
||||
self.guild_id = data["guild_id"]
|
||||
self.channel_id = data["channel_id"]
|
||||
|
||||
self.data = data["data"]
|
||||
|
||||
await self._decode_time_triggers()
|
||||
return self.data
|
||||
|
||||
async def get_triggers(self) -> List[Union[IntervalTrigger, DateTrigger]]:
|
||||
if not self.data:
|
||||
await self.load_from_config()
|
||||
|
||||
if self.data is None or "triggers" not in self.data: # No triggers
|
||||
return []
|
||||
|
||||
return [get_trigger(t) for t in self.data["triggers"]]
|
||||
|
||||
async def get_combined_trigger(self) -> Union[BaseTrigger, None]:
|
||||
if not self.data:
|
||||
await self.load_from_config()
|
||||
|
||||
return parse_triggers(self.data)
|
||||
|
||||
# async def set_job_id(self, job_id):
|
||||
# if self.data is None:
|
||||
# await self.load_from_config()
|
||||
#
|
||||
# self.data["job_id"] = job_id
|
||||
|
||||
async def save_all(self):
|
||||
"""To be used when creating an new task"""
|
||||
|
||||
data_to_save = self.default_task_data.copy()
|
||||
if self.data:
|
||||
data_to_save["command_str"] = self.get_command_str()
|
||||
data_to_save["triggers"] = await self._encode_time_triggers()
|
||||
|
||||
to_save = {
|
||||
"guild_id": self.guild_id,
|
||||
"author_id": self.author_id,
|
||||
"channel_id": self.channel_id,
|
||||
"data": data_to_save,
|
||||
}
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(self.name, value=to_save)
|
||||
|
||||
async def save_data(self):
|
||||
"""To be used when updating triggers"""
|
||||
if not self.data:
|
||||
return
|
||||
|
||||
data_to_save = self.data.copy()
|
||||
data_to_save["triggers"] = await self._encode_time_triggers()
|
||||
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
|
||||
self.name, "data", value=data_to_save
|
||||
)
|
||||
|
||||
async def execute(self):
|
||||
if not self.data or not self.get_command_str():
|
||||
log.warning(f"Could not execute task due to data problem: {self.data=}")
|
||||
return False
|
||||
|
||||
guild: discord.Guild = self.bot.get_guild(self.guild_id) # used for get_prefix
|
||||
if guild is None:
|
||||
log.warning(f"Could not execute task due to missing guild: {self.guild_id}")
|
||||
return False
|
||||
channel: discord.TextChannel = guild.get_channel(self.channel_id)
|
||||
if channel is None:
|
||||
log.warning(f"Could not execute task due to missing channel: {self.channel_id}")
|
||||
return False
|
||||
author: discord.User = guild.get_member(self.author_id)
|
||||
if author is None:
|
||||
log.warning(f"Could not execute task due to missing author: {self.author_id}")
|
||||
return False
|
||||
|
||||
actual_message: discord.Message = channel.last_message
|
||||
# I'd like to present you my chain of increasingly desperate message fetching attempts
|
||||
if actual_message is None:
|
||||
# log.warning("No message found in channel cache yet, skipping execution")
|
||||
# return
|
||||
actual_message = await channel.fetch_message(channel.last_message_id)
|
||||
if actual_message is None: # last_message_id was an invalid message I guess
|
||||
actual_message = await channel.history(limit=1).flatten()
|
||||
if not actual_message: # Basically only happens if the channel has no messages
|
||||
actual_message = await author.history(limit=1).flatten()
|
||||
if not actual_message: # Okay, the *author* has never sent a message?
|
||||
log.warning("No message found in channel cache yet, skipping execution")
|
||||
return
|
||||
actual_message = actual_message[0]
|
||||
|
||||
message = FakeMessage(actual_message)
|
||||
# message = FakeMessage2
|
||||
message.author = author
|
||||
message.guild = guild # Just in case we got desperate
|
||||
message.channel = channel
|
||||
message.id = time_snowflake(datetime.now()) # Pretend to be now
|
||||
message = neuter_message(message)
|
||||
|
||||
# absolutely weird that this takes a message object instead of guild
|
||||
prefixes = await self.bot.get_prefix(message)
|
||||
if isinstance(prefixes, str):
|
||||
prefix = prefixes
|
||||
else:
|
||||
prefix = prefixes[0]
|
||||
|
||||
message.content = f"{prefix}{self.get_command_str()}"
|
||||
|
||||
if not message.guild or not message.author or not message.content:
|
||||
log.warning(f"Could not execute task due to message problem: {message}")
|
||||
return False
|
||||
|
||||
new_ctx: commands.Context = await self.bot.get_context(message)
|
||||
new_ctx.assume_yes = True
|
||||
if not new_ctx.valid:
|
||||
log.warning(f"Could not execute task due invalid context: {new_ctx}")
|
||||
return False
|
||||
|
||||
await self.bot.invoke(new_ctx)
|
||||
return True
|
||||
|
||||
async def set_bot(self, bot: Red):
|
||||
self.bot = bot
|
||||
|
||||
async def set_author(self, author: Union[discord.User, discord.Member, str]):
|
||||
self.author_id = getattr(author, "id", None) or author
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
|
||||
self.name, "author_id", value=self.author_id
|
||||
)
|
||||
|
||||
async def set_channel(self, channel: Union[discord.TextChannel, str]):
|
||||
self.channel_id = getattr(channel, "id", None) or channel
|
||||
await self.config.guild_from_id(self.guild_id).tasks.set_raw(
|
||||
self.name, "channel_id", value=self.author_id
|
||||
)
|
||||
|
||||
def get_command_str(self):
|
||||
return self.data.get("command_str", "")
|
||||
|
||||
async def set_commmand_str(self, command_str):
|
||||
if not self.data:
|
||||
self.data = self.default_task_data.copy()
|
||||
self.data["command_str"] = command_str
|
||||
return True
|
||||
|
||||
async def add_trigger(self, param, parsed_time: Union[timedelta, datetime, str]):
|
||||
trigger_data = {"type": param, "time_data": parsed_time}
|
||||
if not get_trigger(trigger_data):
|
||||
return False
|
||||
|
||||
if not self.data:
|
||||
self.data = self.default_task_data.copy()
|
||||
|
||||
self.data["triggers"].append(trigger_data)
|
||||
return True
|
||||
|
||||
def __setstate__(self, task_state):
|
||||
self.name = task_state["name"]
|
||||
self.guild_id = task_state["guild_id"]
|
||||
self.config = task_state["config"]
|
||||
self.bot = None
|
||||
self.author_id = None
|
||||
self.channel_id = None
|
||||
self.data = None
|
||||
|
||||
def __getstate__(self):
|
||||
return {
|
||||
"name": self.name,
|
||||
"guild_id": self.guild_id,
|
||||
"config": self.config,
|
||||
"bot": self.bot,
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user