diff --git a/conquest/conquest.py b/conquest/conquest.py index abeedf1..a8457a3 100644 --- a/conquest/conquest.py +++ b/conquest/conquest.py @@ -6,14 +6,14 @@ from shutil import copyfile from typing import Optional, Union import discord -from PIL import Image, ImageChops, ImageColor, ImageOps +from PIL import Image, ImageColor, ImageOps from discord.ext.commands import Greedy from redbot.core import Config, commands from redbot.core.bot import Red from redbot.core.data_manager import bundled_data_path, cog_data_path from redbot.core.utils.predicates import MessagePredicate -from conquest.regioner import ConquestMap, Regioner, get_center +from conquest.regioner import ConquestMap, composite_regions class Conquest(commands.Cog): @@ -31,6 +31,10 @@ class Conquest(commands.Cog): "custom": True, } + default_maps_json = { + "maps": [] + } + def __init__(self, bot: Red): super().__init__() self.bot = bot @@ -49,7 +53,7 @@ class Conquest(commands.Cog): if not self.custom_map_path.exists() or not self.custom_map_path.is_dir(): self.custom_map_path.mkdir() with (self.custom_map_path / "maps.json").open("w+") as dj: - json.dump({"maps": []}, dj, sort_keys=True, indent=4) + json.dump(self.default_maps_json.copy(), dj, sort_keys=True, indent=4) self.current_map_folder = self.data_path / "current_maps" if not self.current_map_folder.exists() or not self.current_map_folder.is_dir(): @@ -64,7 +68,6 @@ class Conquest(commands.Cog): self.ext_format = None self.mm: Union[ConquestMap, None] = None - # self.mm_img: Union[Image.Image, None] = None async def red_delete_data_for_user(self, **kwargs): """Nothing to delete""" @@ -90,8 +93,13 @@ class Conquest(commands.Cog): async def current_map_load(self): map_path = self._path_if_custom() map_data_path = map_path / self.current_map / "data.json" - with map_data_path.open() as mapdata: - self.map_data: dict = json.load(mapdata) + try: + with map_data_path.open() as mapdata: + self.map_data: dict = json.load(mapdata) + except FileNotFoundError as e: + print(e) + await self.config.current_map.set(None) + return self.ext = self.map_data["extension"] self.ext_format = "JPEG" if self.ext.upper() == "JPG" else self.ext.upper() @@ -127,46 +135,14 @@ class Conquest(commands.Cog): current_img_path = await self._get_current_map_path() / f"current.{self.ext}" im = Image.open(current_img_path) async with ctx.typing(): - out: Image.Image = await self._composite_regions(im, regions, color, - self._path_if_custom() / self.current_map) + out: Image.Image = await composite_regions( + im, regions, color, self._path_if_custom() / self.current_map + ) out.save(current_img_path, self.ext_format) await self._send_maybe_zoomed_map(ctx, current_img_path, f"map.{self.ext}") - async def _composite_regions(self, im, regions, color, region_path) -> Union[Image.Image, None]: - im2 = Image.new("RGB", im.size, color) - - loop = asyncio.get_running_loop() - - combined_mask = None - for region in regions: - mask = Image.open( - region_path / "masks" / f"{region}.{self.ext}" - ).convert("1") - if combined_mask is None: - combined_mask = mask - else: - # combined_mask = ImageChops.logical_or(combined_mask, mask) - combined_mask = await loop.run_in_executor( - None, ImageChops.logical_and, combined_mask, mask - ) - - if combined_mask is None: # No regions usually - return None - - out = await loop.run_in_executor(None, Image.composite, im, im2, combined_mask.convert("L")) - - return out - - async def _mm_save_map(self, ctx, map_name, target_save): - result = await self.mm.change_name(map_name, target_save) - - if result: - await ctx.maybe_send_embed("Name changed") - - async def _save_mm_data(self, target_save): - data_json = target_save / "data.json" - with data_json.open("w+") as dj: - json.dump(self.mm, dj, sort_keys=True, indent=4) + async def _mm_save_map(self, map_name, target_save): + return await self.mm.change_name(map_name, target_save) @commands.group() async def mapmaker(self, ctx: commands.context): @@ -180,7 +156,6 @@ class Conquest(commands.Cog): async def _mapmaker_close(self, ctx: commands.Context): """Close the currently open map.""" self.mm = None - self.mm_img = None await ctx.tick() @@ -191,17 +166,13 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map currently being worked on") return - if not self.mm_img: - await ctx.maybe_send_embed("No map image to save") - return - - if self.mm["name"] == map_name: + if self.mm.name == map_name: await ctx.maybe_send_embed("This map already has that name, no reason to save") return target_save = self.custom_map_path / map_name - result = await self._mm_save_map(ctx, map_name, target_save) + result = await self._mm_save_map(map_name, target_save) if not result: await ctx.maybe_send_embed("Failed to save to that name") else: @@ -233,6 +204,7 @@ class Conquest(commands.Cog): if not self.mm: self.mm = ConquestMap(self.custom_map_path) + self.mm.custom = True if map_path: map_path = pathlib.Path(map_path) @@ -241,8 +213,7 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("Map not found at that path") return - self.mm_img = Image.open(map_path) - self.mm.extension = map_path.suffix[1:] + mm_img = Image.open(map_path) elif message.attachments: attch: discord.Attachment = message.attachments[0] @@ -251,21 +222,26 @@ class Conquest(commands.Cog): buffer = BytesIO() await attch.save(buffer) - self.mm_img: Image.Image = Image.open(buffer) - - self.mm["extension"] = pathlib.Path(attch.filename).suffix[1:] + mm_img: Image.Image = Image.open(buffer) else: # Wait what? return - result = await self._mm_save_map(ctx, map_name, target_save) + result = await self.mm.init_directory(map_name, target_save, mm_img) if not result: self.mm = None - self.mm_img = None await ctx.maybe_send_embed("Failed to upload to that name") - else: - await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}") + return + + maps_json_path = self.custom_map_path / "maps.json" + with maps_json_path.open("r+") as maps: + map_data = json.load(maps) + map_data["maps"].append(map_name) + maps.seek(0) + json.dump(map_data, maps, sort_keys=True, indent=4) + + await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}") @mapmaker.command(name="sample") async def _mapmaker_sample(self, ctx: commands.Context): @@ -274,65 +250,20 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map currently being worked on") return - if not self.mm_img: - await ctx.maybe_send_embed("No map image has been loaded") - return - async with ctx.typing(): - map_dir = self.custom_map_path / self.mm["name"] - files = [] - - file1 = discord.File(map_dir / f"blank.{self.mm['extension']}") - - files.append(file1) - - masks_dir = map_dir / "masks" - if masks_dir.exists() and masks_dir.is_dir(): - loop = asyncio.get_running_loop() - current_map = Image.open(map_dir / f"blank.{self.mm['extension']}") - - regions = list(self.mm["regions"].keys()) - fourth = len(regions) // 4 - - current_map = await self._composite_regions( - current_map, regions[:fourth], ImageColor.getrgb("red"), map_dir - ) - current_map = await self._composite_regions( - current_map, regions[fourth: fourth * 2], ImageColor.getrgb("green"), map_dir - ) - current_map = await self._composite_regions( - current_map, regions[fourth * 2: fourth * 3], ImageColor.getrgb("blue"), map_dir - ) - current_map = await self._composite_regions( - current_map, regions[fourth * 3:], ImageColor.getrgb("yellow"), map_dir - ) - - numbers = Image.open(map_dir / "numbers.png").convert("L") - inverted_map = ImageOps.invert(current_map) - current_numbered_img = await loop.run_in_executor( - None, Image.composite, current_map, inverted_map, numbers - ) - - buffer1 = BytesIO() - buffer2 = BytesIO() - - current_map.save(buffer1, "png") - buffer1.seek(0) - current_numbered_img.save(buffer2, "png") - buffer2.seek(0) - - files.append(discord.File(fp=buffer1, filename="colored_map.png")) - files.append(discord.File(fp=buffer2, filename="with_numbers.png")) + files = await self.mm.get_sample() for f in files: - await ctx.send(file=f) + await ctx.send(file=discord.File(f)) @mapmaker.command(name="load") async def _mapmaker_load(self, ctx: commands.Context, map_name: str): """Load an existing map to be modified.""" - if self.mm or self.mm_img: - await ctx.maybe_send_embed("There is a current map in progres. Close it first with `[p]mapmaker close`") + if self.mm: + await ctx.maybe_send_embed( + "There is a current map in progress. Close it first with `[p]mapmaker close`" + ) return map_path = self.custom_map_path / map_name @@ -341,12 +272,8 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed(f"Map {map_name} not found in {self.custom_map_path}") return - maps_json = map_path / "data.json" - - with maps_json.open() as maps: - self.mm = json.load(maps) - - self.mm_img = Image.open(map_path / f"blank.{self.mm['extension']}") + self.mm = ConquestMap(map_path) + await self.mm.load_data() await ctx.tick() @@ -368,34 +295,18 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map currently being worked on") return - if not self.mm_img: - await ctx.maybe_send_embed("No map image to save") - return - - map_dir = self.custom_map_path / self.mm["name"] - masks_dir = map_dir / "masks" + masks_dir = self.mm.masks_path() if masks_dir.exists() and masks_dir.is_dir(): await ctx.maybe_send_embed("Mask folder already exists, delete this before continuing") return - # Done by Regioner - # masks_dir.mkdir() - - regioner = Regioner(filename=f"blank.{self.mm['extension']}", filepath=map_dir) - - loop = asyncio.get_running_loop() with ctx.typing(): - regions = await loop.run_in_executor(None, regioner.execute) + regions = await self.mm.generate_masks() if not regions: await ctx.maybe_send_embed("Failed to generate masks") return - self.mm["regions"] = regions - self.mm["region_max"] = len(regions) + 1 - - await self._save_mm_data(map_dir) - await ctx.maybe_send_embed(f"{len(regions)} masks generated into {masks_dir}") @_mapmaker_masks.command(name="combine") @@ -407,24 +318,19 @@ class Conquest(commands.Cog): await ctx.maybe_send_embed("No map currently being worked on") return - if not self.mm_img: - await ctx.maybe_send_embed("No map image to save") - return - if recommended and mask_list: await ctx.maybe_send_embed( "Can't combine recommend masks and a mask list at the same time, pick one" ) return - map_dir = self.custom_map_path / self.mm["name"] - masks_dir = map_dir / "masks" + masks_dir = self.mm.masks_path() if not masks_dir.exists() or not masks_dir.is_dir(): await ctx.maybe_send_embed("There are no masks") return if not recommended: - for mask in mask_list: + for mask in mask_list: # TODO: switch to self.mm.regions intersection of sets m = masks_dir / f"{mask}.png" if not m.exists(): await ctx.maybe_send_embed(f"Mask #{mask} does not exist") @@ -433,29 +339,13 @@ class Conquest(commands.Cog): await ctx.send("Not Implemented") return - regioner = Regioner(filename=f"blank.{self.mm['extension']}", filepath=map_dir) - - loop = asyncio.get_running_loop() - lowest, eliminated = await loop.run_in_executor(None, regioner.combine_masks, mask_list) - - if not lowest: - await ctx.maybe_send_embed("Failed to combine masks") - return - - points = [self.mm["regions"][f"{n}"]["center"] for n in mask_list] - self.mm["regions"][f"{lowest}"]["center"] = get_center(points) - - for key in eliminated: - self.mm["regions"].pop(f"{key}") - - future = await loop.run_in_executor(None, regioner.create_number_mask, self.mm["regions"]) - - if not future: + result = await self.mm.combine_masks(mask_list) + if not result: await ctx.maybe_send_embed( - "Failed to generate number mask, try running this command again" + "Failed to combine masks, try the command again or check log for errors" ) - - await self._save_mm_data(map_dir) + return + await ctx.tick() @commands.group() async def conquest(self, ctx: commands.Context): diff --git a/conquest/regioner.py b/conquest/regioner.py index 6025c5b..ba76117 100644 --- a/conquest/regioner.py +++ b/conquest/regioner.py @@ -1,11 +1,38 @@ +import asyncio import json import pathlib -from typing import List +import shutil +from io import BytesIO +from typing import List, Union -from PIL import Image, ImageChops, ImageColor, ImageDraw, ImageFont +from PIL import Image, ImageChops, ImageColor, ImageDraw, ImageFont, ImageOps from PIL.ImageDraw import _color_diff +async def composite_regions(im, regions, color, masks_path) -> Union[Image.Image, None]: + im2 = Image.new("RGB", im.size, color) + + loop = asyncio.get_running_loop() + + combined_mask = None + for region in regions: + mask = Image.open(masks_path / f"{region}.png").convert("1") + if combined_mask is None: + combined_mask = mask + else: + # combined_mask = ImageChops.logical_or(combined_mask, mask) + combined_mask = await loop.run_in_executor( + None, ImageChops.logical_and, combined_mask, mask + ) + + if combined_mask is None: # No regions usually + return None + + out = await loop.run_in_executor(None, Image.composite, im, im2, combined_mask.convert("L")) + + return out + + def get_center(points): """ Taken from https://stackoverflow.com/questions/4355894/how-to-get-center-of-set-of-points-using-python @@ -81,17 +108,15 @@ def floodfill(image, xy, value, border=None, thresh=0) -> set: class ConquestMap: - def __init__(self, path): + def __init__(self, path: pathlib.Path): self.path = path self.name = None self.custom = None self.region_max = None - self.extension = None self.regions = {} async def change_name(self, new_name: str, new_path: pathlib.Path): - self.name = new_name if new_path.exists() and new_path.is_dir(): # This is an overwrite operation # await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?") @@ -108,10 +133,13 @@ class ConquestMap: # This is a new name new_path.mkdir() - ext_format = "JPEG" if self.extension.upper() == "JPG" else self.extension.upper() - self.mm_img.save(new_path / f"blank.{self.extension}", ext_format) - await self._save_mm_data(target_save) + shutil.copytree(self.path, new_path) + + self.name = new_name + self.path = new_path + + await self.save_data() return True @@ -122,7 +150,7 @@ class ConquestMap: return self.path / "data.json" def blank_path(self): - return self.path / "blank.png" + return self.path / "blank.png" # Everything is png now def numbers_path(self): return self.path / "numbers.png" @@ -130,11 +158,26 @@ class ConquestMap: def numbered_path(self): return self.path / "numbered.png" - def save_data(self): + async def init_directory(self, name: str, path: pathlib.Path, image: Image.Image): + if not path.exists() or not path.is_dir(): + path.mkdir() + + self.name = name + self.path = path + + await self.save_data() + + image.save(self.blank_path(), "PNG") + + return True + + async def save_data(self): + to_save = self.__dict__.copy() + to_save.pop("path") with self.data_path().open("w+") as dp: - json.dump(self.__dict__, dp, sort_keys=True, indent=4) + json.dump(to_save, dp, sort_keys=True, indent=4) - def load_data(self): + async def load_data(self): with self.data_path().open() as dp: data = json.load(dp) @@ -142,24 +185,144 @@ class ConquestMap: self.custom = data["custom"] self.region_max = data["region_max"] - self.regions = {key: Region(number=key, host=self, **data) for key, data in data["regions"].items()} + self.regions = {key: Region(**data) for key, data in data["regions"].items()} - def save_region(self, region): + async def save_region(self, region): if not self.custom: return False pass + async def generate_masks(self): + regioner = Regioner(filename="blank.png", filepath=self.path) + loop = asyncio.get_running_loop() + regions = await loop.run_in_executor(None, regioner.execute) + + if not regions: + return regions + + self.regions = regions + self.region_max = len(regions) + 1 + + await self.save_data() + + async def create_number_mask(self): + regioner = Regioner(filename="blank.png", filepath=self.path) + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, regioner.create_number_mask, self.regions) + + async def combine_masks(self, mask_list: List[int]): + loop = asyncio.get_running_loop() + lowest, eliminated = await loop.run_in_executor(None, self._img_combine_masks, mask_list) + + if not lowest: + return lowest + + elim_regions = [self.regions[n] for n in eliminated] + lowest_region = self.regions[lowest] + + # points = [self.mm["regions"][f"{n}"]["center"] for n in mask_list] + # + # points = [(r.center, r.weight) for r in elim_regions] + + weighted_points = [r.center for r in elim_regions for _ in range(r.weight)] + + lowest_region.center = get_center(weighted_points) + + for key in eliminated: + self.regions.pop(key) + # self.mm["regions"].pop(f"{key}") + + if self.region_max in eliminated: # Max region has changed + self.region_max = max(self.regions.keys()) + + await self.create_number_mask() + + await self.save_data() + + def _img_combine_masks(self, mask_list: List[int]): + if not mask_list: + return False, None + + if not self.blank_path().exists(): + return False, None + + if not self.masks_path().exists(): + return False, None + + base_img: Image.Image = Image.open(self.blank_path()) + mask = Image.new("1", base_img.size, 1) + + lowest_num = None + eliminated_masks = [] + + for mask_num in mask_list: + if lowest_num is None or mask_num < lowest_num: + lowest_num = mask_num + else: + eliminated_masks.append(mask_num) + + mask2 = Image.open(self.masks_path() / f"{mask_num}.png").convert("1") + mask = ImageChops.logical_and(mask, mask2) + + mask.save(self.masks_path() / f"{lowest_num}.png", "PNG") + return lowest_num, eliminated_masks + + async def get_sample(self): + files = [self.blank_path()] + + masks_dir = self.masks_path() + if masks_dir.exists() and masks_dir.is_dir(): + loop = asyncio.get_running_loop() + current_map = Image.open(self.blank_path()) + + regions = list(self.regions.keys()) + fourth = len(regions) // 4 + + current_map = await composite_regions( + current_map, regions[:fourth], ImageColor.getrgb("red"), self.masks_path() + ) + current_map = await composite_regions( + current_map, + regions[fourth: fourth * 2], + ImageColor.getrgb("green"), + self.masks_path(), + ) + current_map = await composite_regions( + current_map, + regions[fourth * 2: fourth * 3], + ImageColor.getrgb("blue"), + self.masks_path(), + ) + current_map = await composite_regions( + current_map, regions[fourth * 3:], ImageColor.getrgb("yellow"), self.masks_path() + ) + + numbers = Image.open(self.numbers_path()).convert("L") + inverted_map = ImageOps.invert(current_map) + current_numbered_img = await loop.run_in_executor( + None, Image.composite, current_map, inverted_map, numbers + ) + + buffer1 = BytesIO() + buffer2 = BytesIO() + + current_map.save(buffer1, "png") + buffer1.seek(0) + current_numbered_img.save(buffer2, "png") + buffer2.seek(0) + + files.append(buffer1) + files.append(buffer2) + + return files + class Region: - def __init__(self, number, host: ConquestMap, center, **kwargs): - self.number = number - self.host = host + def __init__(self, center, weight, **kwargs): self.center = center + self.weight = weight self.data = kwargs - def save(self): - self.host.save_region(self) - class Regioner: def __init__( @@ -196,7 +359,7 @@ class Regioner: already_processed = set() mask_count = 0 - mask_centers = {} + regions = {} for y1 in range(base_img.height): for x1 in range(base_img.width): @@ -213,16 +376,18 @@ class Regioner: mask = mask.convert("L") mask.save(masks_path / f"{mask_count}.png", "PNG") - mask_centers[mask_count] = {"center": get_center(filled), "point_count": len(filled)} + regions[mask_count] = Region( + center=get_center(filled), weight=len(filled) + ) already_processed.update(filled) # TODO: save mask_centers - self.create_number_mask(mask_centers) - return mask_centers + self.create_number_mask(regions) + return regions - def create_number_mask(self, mask_centers): + def create_number_mask(self, regions): base_img_path = self.filepath / self.filename if not base_img_path.exists(): return False @@ -232,39 +397,9 @@ class Regioner: number_img = Image.new("L", base_img.size, 255) fnt = ImageFont.load_default() d = ImageDraw.Draw(number_img) - for mask_num, data in mask_centers.items(): - center = data["center"] - d.text(center, str(mask_num), font=fnt, fill=0) + for region_num, region in regions.items(): + center = region.center + text = getattr(region, "center", str(region_num)) + d.text(center, text, font=fnt, fill=0) number_img.save(self.filepath / f"numbers.png", "PNG") return True - - def combine_masks(self, mask_list: List[int]): - if not mask_list: - return False, None - - base_img_path = self.filepath / self.filename - if not base_img_path.exists(): - return False, None - - masks_path = self.filepath / "masks" - - if not masks_path.exists(): - return False, None - - base_img: Image.Image = Image.open(base_img_path) - mask = Image.new("1", base_img.size, 1) - - lowest_num = None - eliminated_masks = [] - - for mask_num in mask_list: - if lowest_num is None or mask_num < lowest_num: - lowest_num = mask_num - else: - eliminated_masks.append(mask_num) - - mask2 = Image.open(masks_path / f"{mask_num}.png").convert("1") - mask = ImageChops.logical_and(mask, mask2) - - mask.save(masks_path / f"{lowest_num}.png", "PNG") - return lowest_num, eliminated_masks