diff --git a/conquest/conquest.py b/conquest/conquest.py index 3a9a98e..31f168c 100644 --- a/conquest/conquest.py +++ b/conquest/conquest.py @@ -1,8 +1,9 @@ import asyncio import json import pathlib +from io import BytesIO from shutil import copyfile -from typing import Optional +from typing import Optional, Union import discord from PIL import Image, ImageChops, ImageColor, ImageOps @@ -10,16 +11,25 @@ from discord.ext.commands import Greedy from redbot.core import Config, commands from redbot.core.bot import Red from redbot.core.data_manager import bundled_data_path, cog_data_path +from redbot.core.utils.predicates import MessagePredicate + +from conquest.regioner import Regioner, get_center class Conquest(commands.Cog): """ - Cog for + Cog for creating and modifying maps for RPGs and War Games """ default_zoom_json = {"enabled": False, "x": -1, "y": -1, "zoom": 1.0} - default_custom_map = {"name": "", "im": None, "regions": {}, "extension": "png"} + default_custom_map = { + "name": "", + "regions": {}, + "region_max": 0, + "extension": "png", + "custom": True, + } def __init__(self, bot: Red): super().__init__() @@ -29,15 +39,15 @@ class Conquest(commands.Cog): ) default_guild = {} - default_global = {"current_map": None} + default_global = {"current_map": None, "is_custom": False} self.config.register_guild(**default_guild) self.config.register_global(**default_global) self.data_path: pathlib.Path = cog_data_path(self) - self.custom_map_folder = self.data_path / "custom_maps" - if not self.custom_map_folder.exists() or not self.custom_map_folder.is_dir(): - self.custom_map_folder.mkdir() + self.custom_map_path = self.data_path / "custom_maps" + if not self.custom_map_path.exists() or not self.custom_map_path.is_dir(): + self.custom_map_path.mkdir() self.current_map_folder = self.data_path / "current_maps" if not self.current_map_folder.exists() or not self.current_map_folder.is_dir(): @@ -45,29 +55,39 @@ class Conquest(commands.Cog): self.asset_path: Optional[pathlib.Path] = None + self.is_custom = False self.current_map = None self.map_data = None self.ext = None self.ext_format = None - self.mm_current_map = None + self.mm = {} + self.mm_img: Union[Image.Image, None] = None async def red_delete_data_for_user(self, **kwargs): """Nothing to delete""" return + def _path_if_custom(self, custom_custom: bool = None) -> pathlib.Path: + check_value = custom_custom if custom_custom is not None else self.is_custom + if check_value: + return self.custom_map_path + return self.asset_path + async def load_data(self): """ Initial loading of data from bundled_data_path and config """ self.asset_path = bundled_data_path(self) / "assets" self.current_map = await self.config.current_map() + self.is_custom = await self.config.is_custom() if self.current_map: await self.current_map_load() async def current_map_load(self): - map_data_path = self.asset_path / self.current_map / "data.json" + map_path = self._path_if_custom() + map_data_path = map_path / self.current_map / "data.json" with map_data_path.open() as mapdata: self.map_data: dict = json.load(mapdata) self.ext = self.map_data["extension"] @@ -105,11 +125,12 @@ class Conquest(commands.Cog): current_img_path = await self._get_current_map_path() / f"current.{self.ext}" im = Image.open(current_img_path) async with ctx.typing(): - out: Image.Image = await self._composite_regions(im, regions, color) + out: Image.Image = await self._composite_regions(im, regions, color, + self._path_if_custom() / self.current_map) out.save(current_img_path, self.ext_format) await self._send_maybe_zoomed_map(ctx, current_img_path, f"map.{self.ext}") - async def _composite_regions(self, im, regions, color) -> Image.Image: + async def _composite_regions(self, im, regions, color, region_path) -> Image.Image: im2 = Image.new("RGB", im.size, color) loop = asyncio.get_running_loop() @@ -117,20 +138,52 @@ class Conquest(commands.Cog): combined_mask = None for region in regions: mask = Image.open( - self.asset_path / self.current_map / "masks" / f"{region}.{self.ext}" - ).convert("L") + region_path / "masks" / f"{region}.{self.ext}" + ).convert("1") if combined_mask is None: combined_mask = mask else: # combined_mask = ImageChops.logical_or(combined_mask, mask) combined_mask = await loop.run_in_executor( - None, ImageChops.multiply, combined_mask, mask + None, ImageChops.logical_and, combined_mask, mask ) - out = await loop.run_in_executor(None, Image.composite, im, im2, combined_mask) + out = await loop.run_in_executor(None, Image.composite, im, im2, combined_mask.covert("L")) return out + async def _mm_save_map(self, ctx, map_name, target_save): + self.mm["name"] = map_name + if target_save.exists() and target_save.is_dir(): + # This is an overwrite operation + # await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?") + # + # pred = MessagePredicate.yes_or_no(ctx) + # try: + # await self.bot.wait_for("message", check=pred, timeout=30) + # except TimeoutError: + # await ctx.maybe_send_embed("Response timed out, cancelling save") + # return + # if not pred.result: + # return + await ctx.maybe_send_embed("Overwrite currently not supported") + return False + + # This is a new name + target_save.mkdir() + ext = self.mm["extension"] + ext_format = "JPEG" if ext.upper() == "JPG" else ext.upper() + self.mm_img.save(target_save / f"blank.{ext}", ext_format) + + await self._save_mm_data(target_save) + + return True + + async def _save_mm_data(self, target_save): + data_json = target_save / "data.json" + with data_json.open("w+") as dj: + json.dump(self.mm, dj) + @commands.group() async def mapmaker(self, ctx: commands.context): """ @@ -139,39 +192,39 @@ class Conquest(commands.Cog): if ctx.invoked_subcommand is None: pass + @mapmaker.command(name="close") + async def _mapmaker_close(self, ctx: commands.Context): + """Close the currently open map.""" + self.mm = {} + self.mm_img = None + + await ctx.tick() + @mapmaker.command(name="save") async def _mapmaker_save(self, ctx: commands.Context, *, map_name: str): """Save the current map to the specified map name""" - if not self.mm_current_map: + if not self.mm: await ctx.maybe_send_embed("No map currently being worked on") return - if not self.mm_current_map["im"]: + if not self.mm_img: await ctx.maybe_send_embed("No map image to save") return - self.mm_current_map["name"] = map_name - - target_save = self.custom_map_folder / map_name - - if target_save.exists() and target_save.is_dir(): - await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?") - - def check(m): - return ( - m.content.upper() in ["Y", "YES", "N", "NO"] - and m.channel == ctx.channel - and m.author == ctx.author - ) + if self.mm["name"] == map_name: + await ctx.maybe_send_embed("This map already has that name, no reason to save") + return - msg = await self.bot.wait_for("message", check=check) + target_save = self.custom_map_path / map_name - if msg.content.upper() in ["N", "NO"]: - await ctx.send("Cancelled") - return + result = await self._mm_save_map(ctx, map_name, target_save) + if not result: + await ctx.maybe_send_embed("Failed to save to that name") + else: + await ctx.maybe_send_embed(f"Map successfully saved to {target_save}") @mapmaker.command(name="upload") - async def _mapmaker_upload(self, ctx: commands.Context, map_path=""): + async def _mapmaker_upload(self, ctx: commands.Context, map_name: str, map_path=""): """Load a map image to be modified. Upload one with this command or provide a path""" message: discord.Message = ctx.message if not message.attachments and not map_path: @@ -180,8 +233,22 @@ class Conquest(commands.Cog): ) return - if not self.mm_current_map: - self.mm_current_map = self.default_custom_map.copy() + target_save = self.custom_map_path / map_name + + if target_save.exists() and target_save.is_dir(): + await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?") + + pred = MessagePredicate.yes_or_no(ctx) + try: + await self.bot.wait_for("message", check=pred, timeout=30) + except TimeoutError: + await ctx.maybe_send_embed("Response timed out, cancelling save") + return + if not pred.result: + return + + if not self.mm: + self.mm = self.default_custom_map.copy() if map_path: map_path = pathlib.Path(map_path) @@ -190,19 +257,221 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("Map not found at that path") return - self.mm_current_map["im"] = Image.open(map_path) - self.mm_current_map["extension"] = map_path.suffix[1:] + self.mm_img = Image.open(map_path) + self.mm["extension"] = map_path.suffix[1:] - if message.attachments: + elif message.attachments: attch: discord.Attachment = message.attachments[0] - self.mm_current_map["im"] = Image.frombytes( - "RGBA", (attch.width, attch.height), attch.read() - ) + # attch_file = await attch.to_file() + + buffer = BytesIO() + await attch.save(buffer) + + self.mm_img: Image.Image = Image.open(buffer) + + self.mm["extension"] = pathlib.Path(attch.filename).suffix[1:] + else: + # Wait what? + return + + result = await self._mm_save_map(ctx, map_name, target_save) + + if not result: + self.mm = {} + self.mm_img = None + await ctx.maybe_send_embed("Failed to upload to that name") + else: + await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}") + + @mapmaker.command(name="sample") + async def _mapmaker_sample(self, ctx: commands.Context): + """Print the currently being modified map as a sample""" + if not self.mm: + await ctx.maybe_send_embed("No map currently being worked on") + return + + if not self.mm_img: + await ctx.maybe_send_embed("No map image has been loaded") + return + + async with ctx.typing(): + map_dir = self.custom_map_path / self.mm["name"] + + files = [] + + file1 = discord.File(map_dir / f"blank.{self.mm['extension']}") + + files.append(file1) + + masks_dir = map_dir / "masks" + if masks_dir.exists() and masks_dir.is_dir(): + loop = asyncio.get_running_loop() + current_map = Image.open(map_dir / f"blank.{self.mm['extension']}") + + regions = list(self.mm["regions"].keys()) + fourth = len(regions) // 4 + + current_map = await self._composite_regions( + current_map, regions[:fourth], ImageColor.getrgb("red"), map_dir + ) + current_map = await self._composite_regions( + current_map, regions[fourth: fourth * 2], ImageColor.getrgb("green"), map_dir + ) + current_map = await self._composite_regions( + current_map, regions[fourth * 2: fourth * 3], ImageColor.getrgb("blue"), map_dir + ) + current_map = await self._composite_regions( + current_map, regions[fourth * 3:], ImageColor.getrgb("yellow"), map_dir + ) + + numbers = Image.open(map_dir / "numbers.png").convert("L") + inverted_map = ImageOps.invert(current_map) + current_numbered_img = await loop.run_in_executor( + None, Image.composite, current_map, inverted_map, numbers + ) + + buffer1 = BytesIO() + buffer2 = BytesIO() + + current_map.save(buffer1, "png") + buffer1.seek(0) + current_numbered_img.save(buffer2, "png") + buffer2.seek(0) + + files.append(discord.File(fp=buffer1, filename="colored_map.png")) + files.append(discord.File(fp=buffer2, filename="with_numbers.png")) + + for f in files: + await ctx.send(file=f) @mapmaker.command(name="load") - async def _mapmaker_load(self, ctx: commands.Context, map_name=""): + async def _mapmaker_load(self, ctx: commands.Context, map_name: str): """Load an existing map to be modified.""" - await ctx.maybe_send_embed("WIP") + if self.mm or self.mm_img: + await ctx.maybe_send_embed("There is a current map in progres. Close it first with `[p]mapmaker close`") + return + + map_path = self.custom_map_path / map_name + + if not map_path.exists() or not map_path.is_dir(): + await ctx.maybe_send_embed(f"Map {map_name} not found in {self.custom_map_path}") + return + + maps_json = map_path / "data.json" + + with maps_json.open() as maps: + self.mm = json.load(maps) + + self.mm_img = Image.open(map_path / f"blank.{self.mm['extension']}") + + await ctx.tick() + + @mapmaker.group(name="masks") + async def _mapmaker_masks(self, ctx: commands.Context): + """Base command for managing map masks""" + if ctx.invoked_subcommand is None: + pass + + @_mapmaker_masks.command(name="generate") + async def _mapmaker_masks_generate(self, ctx: commands.Context): + """ + Generate masks for the map + + Currently only works on maps with black borders and white regions. + Non-white regions are ignored (i.e. blue water) + """ + if not self.mm: + await ctx.maybe_send_embed("No map currently being worked on") + return + + if not self.mm_img: + await ctx.maybe_send_embed("No map image to save") + return + + map_dir = self.custom_map_path / self.mm["name"] + masks_dir = map_dir / "masks" + if masks_dir.exists() and masks_dir.is_dir(): + await ctx.maybe_send_embed("Mask folder already exists, delete this before continuing") + return + + # Done by Regioner + # masks_dir.mkdir() + + regioner = Regioner(filename=f"blank.{self.mm['extension']}", filepath=map_dir) + + loop = asyncio.get_running_loop() + with ctx.typing(): + regions = await loop.run_in_executor(None, regioner.execute) + + if not regions: + await ctx.maybe_send_embed("Failed to generate masks") + return + + self.mm["regions"] = regions + self.mm["region_max"] = len(regions) + 1 + + await self._save_mm_data(map_dir) + + await ctx.maybe_send_embed(f"{len(regions)} masks generated into {masks_dir}") + + @_mapmaker_masks.command(name="combine") + async def _mapmaker_masks_combine( + self, ctx: commands.Context, mask_list: Greedy[int], recommended=False + ): + """Generate masks for the map""" + if not self.mm: + await ctx.maybe_send_embed("No map currently being worked on") + return + + if not self.mm_img: + await ctx.maybe_send_embed("No map image to save") + return + + if recommended and mask_list: + await ctx.maybe_send_embed( + "Can't combine recommend masks and a mask list at the same time, pick one" + ) + return + + map_dir = self.custom_map_path / self.mm["name"] + masks_dir = map_dir / "masks" + if not masks_dir.exists() or not masks_dir.is_dir(): + await ctx.maybe_send_embed("There are no masks") + return + + if not recommended: + for mask in mask_list: + m = masks_dir / f"{mask}.png" + if not m.exists(): + await ctx.maybe_send_embed(f"Mask #{mask} does not exist") + return + else: + await ctx.send("Not Implemented") + return + + regioner = Regioner(filename=f"blank.{self.mm['extension']}", filepath=map_dir) + + loop = asyncio.get_running_loop() + lowest, eliminated = await loop.run_in_executor(None, regioner.combine_masks, mask_list) + + if not lowest: + await ctx.maybe_send_embed("Failed to combine masks") + return + + points = [self.mm["regions"][f"{n}"] for n in mask_list] + self.mm["regions"][f"{lowest}"] = get_center(points) + + for key in eliminated: + self.mm["regions"].pop(f"{key}") + + future = await loop.run_in_executor(None, regioner.create_number_mask, self.mm["regions"]) + + if not future: + await ctx.maybe_send_embed( + "Failed to generate number mask, try running this command again" + ) + + await self._save_mm_data(map_dir) @commands.group() async def conquest(self, ctx: commands.Context): @@ -218,7 +487,7 @@ class Conquest(commands.Cog): """ List currently available maps """ - maps_json = self.asset_path / "maps.json" + maps_json = self._path_if_custom() / "maps.json" with maps_json.open() as maps: maps_json = json.load(maps) @@ -339,30 +608,30 @@ class Conquest(commands.Cog): await ctx.tick() @conquest_set.command(name="map") - async def _conquest_set_map(self, ctx: commands.Context, mapname: str, reset: bool = False): + async def _conquest_set_map( + self, ctx: commands.Context, mapname: str, is_custom: bool = False, reset: bool = False + ): """ Select a map from current available maps To add more maps, see the guide (WIP) """ - map_dir = self.asset_path / mapname + check_path = self._path_if_custom(is_custom) + + map_dir = check_path / mapname if not map_dir.exists() or not map_dir.is_dir(): await ctx.maybe_send_embed( - f"Map `{mapname}` was not found in the {self.asset_path} directory" + f"Map `{mapname}` was not found in the {check_path} directory" ) return self.current_map = mapname + self.is_custom = is_custom await self.config.current_map.set(self.current_map) # Save to config too + await self.config.is_custom.set(is_custom) await self.current_map_load() - # map_data_path = self.asset_path / mapname / "data.json" - # with map_data_path.open() as mapdata: - # self.map_data = json.load(mapdata) - # - # self.ext = self.map_data["extension"] - current_map_folder = await self._get_current_map_path() current_map = current_map_folder / f"current.{self.ext}" @@ -374,7 +643,7 @@ class Conquest(commands.Cog): else: if not current_map_folder.exists(): current_map_folder.mkdir() - copyfile(self.asset_path / mapname / f"blank.{self.ext}", current_map) + copyfile(check_path / mapname / f"blank.{self.ext}", current_map) await ctx.tick() @@ -400,7 +669,7 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`") return - current_blank_img = self.asset_path / self.current_map / f"blank.{self.ext}" + current_blank_img = self._path_if_custom() / self.current_map / f"blank.{self.ext}" await self._send_maybe_zoomed_map(ctx, current_blank_img, f"blank_map.{self.ext}") @@ -413,11 +682,11 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`") return - numbers_path = self.asset_path / self.current_map / f"numbers.{self.ext}" + numbers_path = self._path_if_custom() / self.current_map / f"numbers.{self.ext}" if not numbers_path.exists(): await ctx.send( file=discord.File( - fp=self.asset_path / self.current_map / f"numbered.{self.ext}", + fp=self._path_if_custom() / self.current_map / f"numbered.{self.ext}", filename=f"numbered.{self.ext}", ) ) diff --git a/conquest/regioner.py b/conquest/regioner.py index 2f82bfa..1764c04 100644 --- a/conquest/regioner.py +++ b/conquest/regioner.py @@ -1,6 +1,7 @@ import pathlib +from typing import List -from PIL import Image, ImageColor, ImageDraw, ImageFont +from PIL import Image, ImageChops, ImageColor, ImageDraw, ImageFont from PIL.ImageDraw import _color_diff @@ -13,6 +14,10 @@ def get_center(points): return sum(x) / len(points), sum(y) / len(points) +def recommended_combinations(mask_centers): + pass # TODO: Create recommendation algo and test it + + def floodfill(image, xy, value, border=None, thresh=0) -> set: """ Taken and modified from PIL.ImageDraw.floodfill @@ -89,11 +94,13 @@ class Regioner: TODO: Using proper multithreading best practices. TODO: This is iterating over a 2d array with some overlap, you went to school for this Bozo + + TODO: Fails on some maps where borders aren't just black (i.e. water borders vs region borders) """ base_img_path = self.filepath / self.filename if not base_img_path.exists(): - return None + return False masks_path = self.filepath / "masks" @@ -118,7 +125,7 @@ class Regioner: if filled: # Pixels were updated, make them into a mask mask = Image.new("L", base_img.size, 255) for x2, y2 in filled: - mask.putpixel((x2, y2), 0) + mask.putpixel((x2, y2), 0) # TODO: Switch to ImageDraw mask_count += 1 mask = mask.convert("L") @@ -130,13 +137,51 @@ class Regioner: # TODO: save mask_centers - return self.create_number_mask(base_img, mask_centers) + self.create_number_mask(mask_centers) + return mask_centers + + def create_number_mask(self, mask_centers): + base_img_path = self.filepath / self.filename + if not base_img_path.exists(): + return False + + base_img: Image.Image = Image.open(base_img_path).convert("L") - def create_number_mask(self, base_img, mask_centers): number_img = Image.new("L", base_img.size, 255) fnt = ImageFont.load_default() d = ImageDraw.Draw(number_img) for mask_num, center in mask_centers.items(): d.text(center, str(mask_num), font=fnt, fill=0) number_img.save(self.filepath / f"numbers.png", "PNG") - return mask_centers + return True + + def combine_masks(self, mask_list: List[int]): + if not mask_list: + return False, None + + base_img_path = self.filepath / self.filename + if not base_img_path.exists(): + return False, None + + masks_path = self.filepath / "masks" + + if not masks_path.exists(): + return False, None + + base_img: Image.Image = Image.open(base_img_path) + mask = Image.new("1", base_img.size, 1) + + lowest_num = None + eliminated_masks = [] + + for mask_num in mask_list: + if lowest_num is None or mask_num < lowest_num: + lowest_num = mask_num + else: + eliminated_masks.append(mask_num) + + mask2 = Image.open(masks_path / f"{mask_num}.png").convert("1") + mask = ImageChops.logical_and(mask, mask2) + + mask.save(masks_path / f"{lowest_num}.png", "PNG") + return lowest_num, eliminated_masks