|
|
|
"""Module to manage audio trivia sessions."""
|
|
|
|
import asyncio
|
|
|
|
import logging
|
|
|
|
|
|
|
|
from redbot.cogs.trivia import TriviaSession
|
|
|
|
from redbot.cogs.trivia.session import _parse_answers
|
|
|
|
from redbot.core.utils.chat_formatting import bold
|
|
|
|
|
|
|
|
log = logging.getLogger("red.fox_v3.audiotrivia.audiosession")
|
|
|
|
|
|
|
|
|
|
|
|
class AudioSession(TriviaSession):
|
|
|
|
"""Class to run a session of audio trivia"""
|
|
|
|
|
|
|
|
def __init__(self, ctx, question_list: dict, settings: dict, audio=None):
|
|
|
|
super().__init__(ctx, question_list, settings)
|
|
|
|
|
|
|
|
self.audio = audio
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def start(cls, ctx, question_list, settings, audio=None):
|
|
|
|
session = cls(ctx, question_list, settings, audio)
|
|
|
|
loop = ctx.bot.loop
|
|
|
|
session._task = loop.create_task(session.run())
|
|
|
|
return session
|
|
|
|
|
|
|
|
async def run(self):
|
|
|
|
"""Run the audio trivia session.
|
|
|
|
|
|
|
|
In order for the trivia session to be stopped correctly, this should
|
|
|
|
only be called internally by `TriviaSession.start`.
|
|
|
|
"""
|
|
|
|
await self._send_startup_msg()
|
|
|
|
max_score = self.settings["max_score"]
|
|
|
|
delay = self.settings["delay"]
|
|
|
|
audio_delay = self.settings["audio_delay"]
|
|
|
|
timeout = self.settings["timeout"]
|
|
|
|
if self.audio is not None:
|
|
|
|
import lavalink
|
|
|
|
|
|
|
|
player = lavalink.get_player(self.ctx.guild.id)
|
|
|
|
player.store("channel", self.ctx.channel.id) # What's this for? I dunno
|
|
|
|
await self.audio.set_player_settings(self.ctx)
|
|
|
|
else:
|
|
|
|
lavalink = None
|
|
|
|
player = False
|
|
|
|
|
|
|
|
for question, answers, audio_url in self._iter_questions():
|
|
|
|
async with self.ctx.typing():
|
|
|
|
await asyncio.sleep(3)
|
|
|
|
self.count += 1
|
|
|
|
msg = bold(f"Question number {self.count}!") + f"\n\n{question}"
|
|
|
|
if player:
|
|
|
|
await player.stop()
|
|
|
|
if audio_url:
|
|
|
|
if not player:
|
|
|
|
log.debug("Got an audio question in a non-audio trivia session")
|
|
|
|
continue
|
|
|
|
|
|
|
|
load_result = await player.load_tracks(audio_url)
|
|
|
|
if (
|
|
|
|
load_result.has_error
|
|
|
|
or load_result.load_type != lavalink.enums.LoadType.TRACK_LOADED
|
|
|
|
):
|
|
|
|
await self.ctx.maybe_send_embed(
|
|
|
|
"Audio Track has an error, skipping. See logs for details"
|
|
|
|
)
|
|
|
|
log.info(f"Track has error: {load_result.exception_message}")
|
|
|
|
continue
|
|
|
|
tracks = load_result.tracks
|
|
|
|
track = tracks[0]
|
|
|
|
seconds = track.length / 1000
|
|
|
|
track.uri = "" # Hide the info from `now`
|
|
|
|
if self.settings["repeat"] and seconds < audio_delay:
|
|
|
|
# Append it until it's longer than the delay
|
|
|
|
tot_length = seconds + 0
|
|
|
|
while tot_length < audio_delay:
|
|
|
|
player.add(self.ctx.author, track)
|
|
|
|
tot_length += seconds
|
|
|
|
else:
|
|
|
|
player.add(self.ctx.author, track)
|
|
|
|
|
|
|
|
if not player.current:
|
|
|
|
await player.play()
|
|
|
|
await self.ctx.maybe_send_embed(msg)
|
|
|
|
log.debug(f"Audio question: {question}")
|
|
|
|
|
|
|
|
continue_ = await self.wait_for_answer(
|
|
|
|
answers, audio_delay if audio_url else delay, timeout
|
|
|
|
)
|
|
|
|
if continue_ is False:
|
|
|
|
break
|
|
|
|
if any(score >= max_score for score in self.scores.values()):
|
|
|
|
await self.end_game()
|
|
|
|
break
|
|
|
|
else:
|
|
|
|
await self.ctx.maybe_send_embed("There are no more questions!")
|
|
|
|
await self.end_game()
|
|
|
|
|
|
|
|
async def end_game(self):
|
|
|
|
await super().end_game()
|
|
|
|
if self.audio is not None:
|
|
|
|
await self.ctx.invoke(self.audio.command_disconnect)
|
|
|
|
|
|
|
|
def _iter_questions(self):
|
|
|
|
"""Iterate over questions and answers for this session.
|
|
|
|
|
|
|
|
Yields
|
|
|
|
------
|
|
|
|
`tuple`
|
|
|
|
A tuple containing the question (`str`) and the answers (`tuple` of
|
|
|
|
`str`).
|
|
|
|
|
|
|
|
"""
|
|
|
|
for question, q_data in self.question_list:
|
|
|
|
answers = _parse_answers(q_data["answers"])
|
|
|
|
_audio = q_data["audio"]
|
|
|
|
if _audio:
|
|
|
|
yield _audio, answers, question.strip("<>")
|
|
|
|
else:
|
|
|
|
yield question, answers, _audio
|