You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
790 lines
28 KiB
790 lines
28 KiB
import asyncio
|
|
import json
|
|
import logging
|
|
import pathlib
|
|
from collections import defaultdict
|
|
from io import BytesIO
|
|
from shutil import copyfile
|
|
from typing import Optional, Union, Dict
|
|
|
|
import discord
|
|
from PIL import Image, ImageColor, ImageOps, ImageFont
|
|
from discord.ext.commands import Greedy
|
|
from redbot.core import Config, commands
|
|
from redbot.core.commands import Context
|
|
from redbot.core.bot import Red
|
|
from redbot.core.data_manager import bundled_data_path, cog_data_path
|
|
from redbot.core.utils.predicates import MessagePredicate
|
|
|
|
from conquest import regioner
|
|
from conquest.conquestgame import ConquestGame
|
|
|
|
ERROR_CONQUEST_SET_MAP = "No map is currently set. See `[p]conquest set map`"
|
|
|
|
log = logging.getLogger("red.fox_v3.conquest")
|
|
|
|
|
|
class Conquest(commands.Cog):
|
|
"""
|
|
Create and modify maps for RPGs and War Games
|
|
"""
|
|
|
|
default_maps_json = {"maps": []}
|
|
|
|
# Usage: await self.config.games.get_raw("game_name", "is_custom")
|
|
default_games = {"map_name": None, "is_custom": False}
|
|
|
|
ext = "PNG"
|
|
ext_format = "PNG"
|
|
|
|
def __init__(self, bot: Red):
|
|
super().__init__()
|
|
self.bot = bot
|
|
self.config = Config.get_conf(
|
|
self, identifier=67_111_110_113_117_101_115_116, force_registration=True
|
|
)
|
|
|
|
default_guild = {"current_game": None}
|
|
default_global = {"games": {}}
|
|
self.config.register_guild(**default_guild)
|
|
self.config.register_global(**default_global)
|
|
|
|
self.data_path: pathlib.Path = cog_data_path(self)
|
|
|
|
self.custom_map_folder = self.data_path / "custom_maps"
|
|
if not self.custom_map_folder.exists() or not self.custom_map_folder.is_dir():
|
|
self.custom_map_folder.mkdir()
|
|
with (self.custom_map_folder / "maps.json").open("w+") as dj:
|
|
json.dump(self.default_maps_json.copy(), dj, sort_keys=True, indent=4)
|
|
|
|
self.current_map_folder = self.data_path / "current_maps"
|
|
if not self.current_map_folder.exists() or not self.current_map_folder.is_dir():
|
|
self.current_map_folder.mkdir()
|
|
|
|
self.asset_path: Optional[pathlib.Path] = None
|
|
|
|
self.current_games: Dict[int, Optional[ConquestGame]] = defaultdict(
|
|
lambda: None
|
|
) # key: guild_id
|
|
self.map_data = {} # key, value = guild.id, ConquestGame
|
|
|
|
self.mm: Optional[regioner.MapMaker] = None
|
|
|
|
async def red_delete_data_for_user(self, **kwargs):
|
|
"""Nothing to delete"""
|
|
return
|
|
|
|
def _path_if_custom(self, custom_custom: bool = None) -> pathlib.Path:
|
|
check_value = custom_custom # if custom_custom is not None else self.is_custom
|
|
if check_value:
|
|
return self.custom_map_folder
|
|
return self.asset_path
|
|
|
|
async def load_data(self):
|
|
"""
|
|
Initial loading of data from bundled_data_path and config
|
|
"""
|
|
self.asset_path = bundled_data_path(self) / "assets"
|
|
for guild in self.bot.guilds:
|
|
game_data = await self.config.guild(guild).current_game()
|
|
if game_data is not None:
|
|
await self.load_guild_data(guild, **game_data)
|
|
|
|
# regioner.MAP_FONT = ImageFont.truetype(
|
|
# str(bundled_data_path(self) / "fonts" / "smallest_pixel_7" / "smallest_pixel-7.ttf"), size=10
|
|
# )
|
|
|
|
# regioner.MAP_FONT = ImageFont.truetype(
|
|
# str(bundled_data_path(self) / "fonts" / "bit01" / "bit01.ttf"), size=4
|
|
# )
|
|
|
|
regioner.MAP_FONT = ImageFont.truetype(
|
|
str(bundled_data_path(self) / "fonts" / "pixels" / "Pixels.ttf"), size=16
|
|
)
|
|
|
|
# for guild_id, game_name in self.current_maps.items():
|
|
# await self.current_map_load(guild_id, game_name)
|
|
|
|
async def load_guild_data(self, guild: discord.Guild, map_name: str, is_custom: bool):
|
|
# map_data = await self.config.games.get_raw(game_name)
|
|
# if map_data is None:
|
|
# return False
|
|
# map_name = map_data["map_name"]
|
|
map_path = self._path_if_custom(is_custom) / map_name
|
|
|
|
if (
|
|
not (self.current_map_folder / str(guild.id)).exists()
|
|
or not (self.current_map_folder / str(guild.id) / map_name).exists()
|
|
):
|
|
return False
|
|
|
|
self.current_games[guild.id] = ConquestGame(
|
|
map_path, map_name, self.current_map_folder / str(guild.id) / map_name
|
|
)
|
|
|
|
return True
|
|
|
|
# async def current_map_load(self):
|
|
# map_path = self._path_if_custom()
|
|
# self.map_data = ConquestMap(map_path / self.current_map)
|
|
# await self.map_data.load_data()
|
|
# # map_data_path = map_path / self.current_map / "data.json"
|
|
# # try:
|
|
# # with map_data_path.open() as mapdata:
|
|
# # self.map_data: dict = json.load(mapdata)
|
|
# # except FileNotFoundError as e:
|
|
# # print(e)
|
|
# # await self.config.current_map.set(None)
|
|
# # return
|
|
|
|
# async def _get_current_map_folder(self, guild):
|
|
# return self.current_map_folder / guild.id / self.current_map
|
|
|
|
async def _mm_save_map(self, map_name, target_save):
|
|
return await self.mm.change_name(map_name, target_save)
|
|
|
|
@commands.is_owner()
|
|
@commands.group()
|
|
async def mapmaker(self, ctx: Context):
|
|
"""
|
|
Base command for managing current maps or creating new ones
|
|
"""
|
|
if ctx.invoked_subcommand is None:
|
|
pass
|
|
|
|
@mapmaker.command(name="numbers")
|
|
async def _mapmaker_numbers(self, ctx: Context):
|
|
"""Regenerates the number mask and puts it in the channel"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
await self.mm.create_number_mask()
|
|
im = await self.mm.get_blank_numbered_file()
|
|
|
|
await ctx.send(file=discord.File(im, filename="map.png"))
|
|
|
|
@mapmaker.command(name="close")
|
|
async def _mapmaker_close(self, ctx: Context):
|
|
"""Close the currently open map."""
|
|
self.mm = None
|
|
|
|
await ctx.tick()
|
|
|
|
@mapmaker.group(name="debug")
|
|
async def _mapmaker_debug(self, ctx: Context):
|
|
"""Debug commands for making maps. Don't use unless directed to."""
|
|
pass
|
|
|
|
@_mapmaker_debug.command(name="recalculate")
|
|
async def _mapmaker_debug_recalculate(self, ctx: Context, mask_list: Greedy[int]):
|
|
"""Recaculate the center point and weight for the given region.
|
|
|
|
Processes all regions if region isn't specified."""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
await self.mm.recalculate_region(mask_list)
|
|
|
|
await ctx.tick()
|
|
|
|
@_mapmaker_debug.command(name="sort")
|
|
async def _mapmaker_debug_sort(self, ctx: Context):
|
|
"""Sorts the regions by position on the map."""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
await self.mm.sort_regions()
|
|
|
|
await ctx.tick()
|
|
|
|
@mapmaker.command(name="save")
|
|
async def _mapmaker_save(self, ctx: Context, *, map_name: str):
|
|
"""Save the current map to a different map name"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
if self.mm.name == map_name:
|
|
await ctx.maybe_send_embed("This map already has that name, no reason to save")
|
|
return
|
|
|
|
target_save = self.custom_map_folder / map_name
|
|
|
|
result = await self._mm_save_map(map_name, target_save)
|
|
if not result:
|
|
await ctx.maybe_send_embed("Failed to save to that name")
|
|
else:
|
|
await ctx.maybe_send_embed(f"Map successfully saved to {target_save}")
|
|
|
|
@mapmaker.command(name="upload")
|
|
async def _mapmaker_upload(self, ctx: Context, map_name: str, path_to_image=""):
|
|
"""Load a map image to be modified. Upload one with this command or provide a path"""
|
|
message: discord.Message = ctx.message
|
|
if not message.attachments and not path_to_image:
|
|
await ctx.maybe_send_embed(
|
|
"Either upload an image with this command or provide a path to the image"
|
|
)
|
|
return
|
|
|
|
target_save = self.custom_map_folder / map_name
|
|
|
|
if target_save.exists() and target_save.is_dir():
|
|
await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?")
|
|
|
|
pred = MessagePredicate.yes_or_no(ctx)
|
|
try:
|
|
await self.bot.wait_for("message", check=pred, timeout=30)
|
|
except TimeoutError:
|
|
await ctx.maybe_send_embed("Response timed out, cancelling save")
|
|
return
|
|
if not pred.result:
|
|
return
|
|
|
|
if self.mm: # Only one map can be worked on at a time
|
|
await ctx.maybe_send_embed(
|
|
"An existing map is in progress, close it before opening a new one. (`[p]mapmaker close`)"
|
|
)
|
|
return
|
|
|
|
async with ctx.typing():
|
|
self.mm = regioner.MapMaker(target_save)
|
|
self.mm.custom = True
|
|
|
|
if path_to_image:
|
|
path_to_image = pathlib.Path(path_to_image)
|
|
|
|
if not path_to_image.exists():
|
|
await ctx.maybe_send_embed("Map not found at that path")
|
|
return
|
|
|
|
mm_img = Image.open(path_to_image)
|
|
|
|
elif message.attachments:
|
|
attch: discord.Attachment = message.attachments[0]
|
|
# attch_file = await attch.to_file()
|
|
|
|
buffer = BytesIO()
|
|
await attch.save(buffer)
|
|
|
|
mm_img: Image.Image = Image.open(buffer)
|
|
else:
|
|
# Wait what?
|
|
return
|
|
|
|
if mm_img.mode == "P":
|
|
# Maybe convert to L to prevent RGB?
|
|
mm_img = mm_img.convert() # No P mode, convert it
|
|
|
|
result = await self.mm.init_directory(map_name, target_save, mm_img)
|
|
|
|
if not result:
|
|
self.mm = None
|
|
await ctx.maybe_send_embed("Failed to upload to that name")
|
|
return
|
|
|
|
maps_json_path = self.custom_map_folder / "maps.json"
|
|
with maps_json_path.open("r+") as maps:
|
|
map_data = json.load(maps)
|
|
map_data["maps"].append(map_name)
|
|
maps.seek(0)
|
|
json.dump(map_data, maps, sort_keys=True, indent=4)
|
|
|
|
await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}")
|
|
|
|
@mapmaker.command(name="sample")
|
|
async def _mapmaker_sample(self, ctx: Context, region: int = None):
|
|
"""Print the currently being modified map as a sample"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
if region is not None and region not in self.mm.regions:
|
|
await ctx.send("This region doesn't exist or was deleted")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
|
|
files = await self.mm.get_sample(region)
|
|
|
|
for f in files:
|
|
await ctx.send(file=discord.File(f, filename="map.png"))
|
|
|
|
await ctx.tick()
|
|
|
|
@mapmaker.command(name="region")
|
|
async def _mapmaker_region(self, ctx: Context, region: int, print_number: bool = False):
|
|
"""Print the currently being modified map as a sample"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
if region not in self.mm.regions:
|
|
await ctx.send("This region doesn't exist or was deleted")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
|
|
files = await self.mm.sample_region(region, print_number)
|
|
|
|
for f in files:
|
|
await ctx.send(file=discord.File(f, filename="map.png"))
|
|
|
|
await ctx.tick()
|
|
|
|
@mapmaker.command(name="load")
|
|
async def _mapmaker_load(self, ctx: Context, map_name: str):
|
|
"""Load an existing map to be modified."""
|
|
if self.mm:
|
|
await ctx.maybe_send_embed(
|
|
"There is a current map in progress. Close it first with `[p]mapmaker close`"
|
|
)
|
|
return
|
|
|
|
map_path = self.custom_map_folder / map_name
|
|
|
|
if not map_path.exists() or not map_path.is_dir():
|
|
await ctx.maybe_send_embed(f"Map {map_name} not found in {self.custom_map_folder}")
|
|
return
|
|
|
|
self.mm = regioner.MapMaker(map_path)
|
|
self.mm.load_data()
|
|
|
|
await ctx.tick()
|
|
|
|
@mapmaker.group(name="masks")
|
|
async def _mapmaker_masks(self, ctx: Context):
|
|
"""Base command for managing map masks"""
|
|
if ctx.invoked_subcommand is None:
|
|
pass
|
|
|
|
@_mapmaker_masks.command(name="generate")
|
|
async def _mapmaker_masks_generate(self, ctx: Context):
|
|
"""
|
|
Generate masks for the map
|
|
|
|
Currently only works on maps with black borders and white regions.
|
|
Non-white regions are ignored (i.e. blue water)
|
|
"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
masks_dir = self.mm.masks_path()
|
|
if masks_dir.exists() and masks_dir.is_dir():
|
|
await ctx.maybe_send_embed("Mask folder already exists, delete this before continuing")
|
|
return
|
|
|
|
with ctx.typing():
|
|
regions = await self.mm.generate_masks()
|
|
|
|
if not regions:
|
|
await ctx.maybe_send_embed("Failed to generate masks")
|
|
return
|
|
|
|
await ctx.maybe_send_embed(f"{len(regions)} masks generated into {masks_dir}")
|
|
|
|
@_mapmaker_masks.command(name="delete")
|
|
async def _mapmaker_masks_delete(self, ctx: Context, mask_list: Greedy[int]):
|
|
"""
|
|
Delete the listed masks from the map
|
|
"""
|
|
if not mask_list:
|
|
await ctx.send_help()
|
|
return
|
|
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
masks_dir = self.mm.masks_path()
|
|
if not masks_dir.exists() or not masks_dir.is_dir():
|
|
await ctx.maybe_send_embed("There are no masks")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
result = await self.mm.delete_masks(mask_list)
|
|
if result:
|
|
await ctx.maybe_send_embed(f"Delete masks: {mask_list}")
|
|
else:
|
|
await ctx.maybe_send_embed(f"Failed to delete masks")
|
|
|
|
@_mapmaker_masks.command(name="cleanup", aliases=["reduce", "prune"])
|
|
async def _mapmaker_masks_prune(self, ctx: Context):
|
|
"""
|
|
Removes deleted masks, and reduces the ID numbers down to the lowest available.
|
|
|
|
Warning: Backup your files before running this. Any errors will break your masks.
|
|
"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
masks_dir = self.mm.masks_path()
|
|
if not masks_dir.exists() or not masks_dir.is_dir():
|
|
await ctx.maybe_send_embed("There are no masks")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
result = await self.mm.prune_masks()
|
|
if result:
|
|
await ctx.maybe_send_embed(f"Pruned masks: {result}")
|
|
else:
|
|
await ctx.maybe_send_embed(f"Failed to cleanup masks")
|
|
|
|
@_mapmaker_masks.command(name="convert")
|
|
async def _mapmaker_masks_convert(self, ctx: Context, mask_list: Greedy[int]):
|
|
"""
|
|
Converts the provided mask images into 1-bit black-only masks.
|
|
Processes all masks if mask_list is empty
|
|
|
|
Warning: This may take a while.
|
|
"""
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
masks_dir = self.mm.masks_path()
|
|
if not masks_dir.exists() or not masks_dir.is_dir():
|
|
await ctx.maybe_send_embed("There are no masks")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
result = await self.mm.convert_masks(mask_list)
|
|
if result:
|
|
await ctx.maybe_send_embed(f"All masks converted")
|
|
else:
|
|
await ctx.maybe_send_embed(f"Failed to convert masks")
|
|
|
|
@_mapmaker_masks.command(name="merge", aliases=["combine"])
|
|
async def _mapmaker_masks_combine(
|
|
self, ctx: Context, mask_list: Greedy[int], recommended=False
|
|
):
|
|
"""Merge masks into a single mask"""
|
|
if not mask_list and not recommended:
|
|
await ctx.send_help()
|
|
return
|
|
|
|
if not self.mm:
|
|
await ctx.maybe_send_embed("No map currently being worked on")
|
|
return
|
|
|
|
if recommended and mask_list:
|
|
await ctx.maybe_send_embed(
|
|
"Can't combine recommend masks and a mask list at the same time, pick one"
|
|
)
|
|
return
|
|
|
|
masks_dir = self.mm.masks_path()
|
|
if not masks_dir.exists() or not masks_dir.is_dir():
|
|
await ctx.maybe_send_embed("There are no masks")
|
|
return
|
|
|
|
if not recommended:
|
|
for mask in mask_list: # TODO: switch to self.mm.regions intersection of sets
|
|
m = masks_dir / f"{mask}.png"
|
|
if not m.exists():
|
|
await ctx.maybe_send_embed(f"Mask #{mask} does not exist")
|
|
return
|
|
else:
|
|
await ctx.send("Not Implemented")
|
|
return
|
|
|
|
async with ctx.typing():
|
|
result = await self.mm.combine_masks(mask_list)
|
|
if not result:
|
|
await ctx.maybe_send_embed(
|
|
"Failed to combine masks, try the command again or check log for errors"
|
|
)
|
|
return
|
|
await ctx.maybe_send_embed(f"Combined masks into mask # {result}")
|
|
|
|
@commands.group()
|
|
async def conquest(self, ctx: Context):
|
|
"""
|
|
Base command for conquest cog. Start with `[p]conquest set map` to select a map.
|
|
"""
|
|
# if ctx.invoked_subcommand is None:
|
|
# if self.current_maps[ctx.guild.id] is not None:
|
|
# await self._conquest_current(ctx)
|
|
|
|
@conquest.command(name="list")
|
|
async def _conquest_list(self, ctx: Context):
|
|
"""
|
|
List maps available for starting a Conquest game.
|
|
"""
|
|
maps_json = self.asset_path / "maps.json"
|
|
with maps_json.open() as maps:
|
|
maps_json = json.load(maps)
|
|
map_list = maps_json["maps"]
|
|
|
|
maps_json = self.custom_map_folder / "maps.json"
|
|
if maps_json.exists():
|
|
with maps_json.open() as maps:
|
|
maps_json = json.load(maps)
|
|
custom_map_list = maps_json["maps"]
|
|
|
|
map_list = "\n".join(map_list)
|
|
custom_map_list = "\n".join(custom_map_list)
|
|
await ctx.maybe_send_embed(f"Current maps:\n{map_list}\n\nCustom maps:\n{custom_map_list}")
|
|
|
|
@conquest.group(name="set")
|
|
async def conquest_set(self, ctx: Context):
|
|
"""Base command for admin actions like selecting a map"""
|
|
if ctx.invoked_subcommand is None:
|
|
pass
|
|
|
|
@conquest_set.command(name="resetzoom")
|
|
async def _conquest_set_resetzoom(self, ctx: Context):
|
|
"""Resets the zoom level of the current map"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
if not await current_game.reset_zoom():
|
|
await ctx.maybe_send_embed(f"No zoom data found, reset not needed")
|
|
await ctx.tick()
|
|
|
|
@conquest_set.command(name="zoom")
|
|
async def _conquest_set_zoom(self, ctx: Context, x: int, y: int, zoom: float):
|
|
"""
|
|
Set the zoom level and position of the current map
|
|
|
|
x: positive integer
|
|
y: positive integer
|
|
zoom: float greater than or equal to 1
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
if x < 0 or y < 0 or zoom < 1:
|
|
await ctx.send_help()
|
|
return
|
|
|
|
await current_game.set_zoom(x, y, zoom)
|
|
|
|
await ctx.tick()
|
|
|
|
@conquest_set.command(name="zoomtest")
|
|
async def _conquest_set_zoomtest(self, ctx: Context, x: int, y: int, zoom: float):
|
|
"""
|
|
Test the zoom level and position of the current map
|
|
|
|
x: positive integer
|
|
y: positive integer
|
|
zoom: float greater than or equal to 1
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
if x < 0 or y < 0 or zoom < 1:
|
|
await ctx.send_help()
|
|
return
|
|
|
|
# UNUSED: out_of_date marks zoom as oudated, since this overwrite the temp
|
|
zoomed_path = await current_game.create_zoomed_map(
|
|
x, y, zoom, current_game.current_map, current_game.zoomed_current_map
|
|
)
|
|
|
|
await ctx.send(file=discord.File(fp=zoomed_path, filename=f"test_zoom.{self.ext}"))
|
|
|
|
@conquest_set.command(name="save")
|
|
async def _conquest_set_save(self, ctx: Context, *, save_name):
|
|
"""Save the current map to be loaded later"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
await current_game.save_as(save_name)
|
|
|
|
await ctx.tick()
|
|
|
|
@conquest_set.command(name="load")
|
|
async def _conquest_set_load(self, ctx: Context, *, save_name):
|
|
"""Load a saved map to be the current map"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
if not await current_game.load_from(save_name):
|
|
await ctx.maybe_send_embed(f"Saved map not found, check your spelling")
|
|
return
|
|
|
|
await ctx.tick()
|
|
|
|
@conquest_set.command(name="map")
|
|
async def _conquest_set_map(
|
|
self, ctx: Context, mapname: str, reset: bool = False, is_custom: bool = False
|
|
):
|
|
"""
|
|
Select a map from current available maps
|
|
|
|
To add more maps, see the guide (WIP)
|
|
"""
|
|
check_path = self._path_if_custom(is_custom)
|
|
|
|
map_dir = check_path / mapname
|
|
if not map_dir.exists() or not map_dir.is_dir():
|
|
await ctx.maybe_send_embed(
|
|
f"Map `{mapname}` was not found in the {check_path} directory"
|
|
)
|
|
return
|
|
|
|
guild_folder = self.current_map_folder / str(ctx.guild.id)
|
|
if not guild_folder.exists():
|
|
guild_folder.mkdir()
|
|
self.current_games[ctx.guild.id] = ConquestGame(map_dir, mapname, guild_folder / mapname)
|
|
|
|
# self.current_map = mapname
|
|
# self.is_custom = is_custom
|
|
# await self.config.current_map.set(self.current_map) # Save to config too
|
|
# await self.config.is_custom.set(is_custom)
|
|
#
|
|
# await self.current_map_load()
|
|
|
|
await self.current_games[ctx.guild.id].resume_game(ctx, reset)
|
|
|
|
new_game = self.default_games.copy()
|
|
new_game["map_name"] = mapname
|
|
new_game["is_custom"] = is_custom
|
|
|
|
await self.config.guild(ctx.guild).current_game.set(new_game)
|
|
|
|
# current_map_folder = await self._get_current_map_folder()
|
|
# current_map = current_map_folder / f"current.{self.ext}"
|
|
#
|
|
# if not reset and current_map.exists():
|
|
# await ctx.maybe_send_embed(
|
|
# "This map is already in progress, resuming from last game\n"
|
|
# "Use `[p]conquest set map [mapname] True` to start a new game"
|
|
# )
|
|
# else:
|
|
# if not current_map_folder.exists():
|
|
# current_map_folder.mkdir()
|
|
# copyfile(check_path / mapname / f"blank.{self.ext}", current_map)
|
|
|
|
await ctx.tick()
|
|
|
|
@conquest.command(name="current")
|
|
async def _conquest_current(self, ctx: Context):
|
|
"""
|
|
Send the current map.
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
async with ctx.typing():
|
|
map_file = await current_game.get_maybe_zoomed_map("current")
|
|
await ctx.send(file=map_file)
|
|
|
|
@conquest.command("blank")
|
|
async def _conquest_blank(self, ctx: Context):
|
|
"""
|
|
Print the blank version of the current map, for reference.
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
async with ctx.typing():
|
|
map_file = await current_game.get_maybe_zoomed_map("blank")
|
|
await ctx.send(file=map_file)
|
|
|
|
@conquest.command("numbered")
|
|
async def _conquest_numbered(self, ctx: Context):
|
|
"""
|
|
Print the numbered version of the current map, for reference.
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
async with ctx.typing():
|
|
map_file = await current_game.get_maybe_zoomed_map("numbered")
|
|
await ctx.send(file=map_file)
|
|
|
|
@conquest.command(name="multitake")
|
|
async def _conquest_multitake(
|
|
self, ctx: Context, start_region: int, end_region: int, color: str
|
|
):
|
|
"""
|
|
Claim all the territories between the two provided region numbers (inclusive)
|
|
|
|
:param start_region:
|
|
"""
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
try:
|
|
color = ImageColor.getrgb(color)
|
|
except ValueError:
|
|
await ctx.maybe_send_embed(f"Invalid color {color}")
|
|
return
|
|
|
|
if start_region > end_region:
|
|
start_region, end_region = end_region, start_region
|
|
|
|
if end_region > current_game.region_max or start_region < 1:
|
|
await ctx.maybe_send_embed(
|
|
f"Max region number is {current_game.region_max}, minimum is 1"
|
|
)
|
|
return
|
|
|
|
regions = [r for r in range(start_region, end_region + 1)]
|
|
|
|
async with ctx.typing():
|
|
await current_game._process_take_regions(color, regions)
|
|
map_file = await current_game.get_maybe_zoomed_map("current")
|
|
|
|
await ctx.send(file=map_file)
|
|
|
|
@conquest.command(name="take")
|
|
async def _conquest_take(self, ctx: Context, regions: Greedy[int], *, color: str):
|
|
"""
|
|
Claim a territory or list of territories for a specified color
|
|
|
|
:param regions: List of integer regions
|
|
:param color: Color to claim regions
|
|
"""
|
|
if not regions:
|
|
await ctx.send_help()
|
|
return
|
|
|
|
current_game = self.current_games[ctx.guild.id]
|
|
if current_game is None:
|
|
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
|
|
return
|
|
|
|
try:
|
|
color = ImageColor.getrgb(color)
|
|
except ValueError:
|
|
await ctx.maybe_send_embed(f"Invalid color {color}")
|
|
return
|
|
|
|
for region in regions:
|
|
if region > current_game.region_max or region < 1:
|
|
await ctx.maybe_send_embed(
|
|
f"Max region number is {current_game.region_max}, minimum is 1"
|
|
)
|
|
return
|
|
async with ctx.typing():
|
|
await current_game._process_take_regions(color, regions)
|
|
map_file = await current_game.get_maybe_zoomed_map("current")
|
|
|
|
await ctx.send(file=map_file)
|