You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Fox-V3/conquest/regioner.py

676 lines
22 KiB

import asyncio
import json
import logging
import pathlib
import random
import shutil
from io import BytesIO
from typing import List, Union, Optional
import numpy
from PIL import Image, ImageChops, ImageColor, ImageDraw, ImageFont, ImageOps
from PIL.ImageDraw import _color_diff
from redbot.core.utils import AsyncIter
log = logging.getLogger("red.fox_v3.conquest.regioner")
MAP_FONT: Optional[ImageFont.ImageFont] = None
MASK_MODE = "1" # "L" for 8 bit masks, "1" for 1 bit masks
async def composite_regions(im, regions, color, masks_path) -> Union[Image.Image, None]:
im2 = Image.new("RGB", im.size, color)
loop = asyncio.get_running_loop()
combined_mask = None
for region in regions:
mask = Image.open(masks_path / f"{region}.png").convert(MASK_MODE)
if combined_mask is None:
combined_mask = mask
else:
# combined_mask = ImageChops.logical_or(combined_mask, mask)
combined_mask = await loop.run_in_executor(
None, ImageChops.logical_and, combined_mask, mask
)
if combined_mask is None: # No regions usually
return None
out = await loop.run_in_executor(None, Image.composite, im, im2, combined_mask)
return out
def get_center(points):
"""
Taken from https://stackoverflow.com/questions/4355894/how-to-get-center-of-set-of-points-using-python
"""
x = [p[0] for p in points]
y = [p[1] for p in points]
return sum(x) / len(points), sum(y) / len(points)
def recommended_combinations(mask_centers):
pass # TODO: Create recommendation algo and test it
def chunker(seq, size):
"""https://stackoverflow.com/a/434328"""
return (seq[pos:pos + size] for pos in range(0, len(seq), size))
def floodfill(image, xy, value, border=None, thresh=0) -> set:
"""
Taken and modified from PIL.ImageDraw.floodfill
(experimental) Fills a bounded region with a given color.
:param image: Target image.
:param xy: Seed position (a 2-item coordinate tuple). See
:ref:`coordinate-system`.
:param value: Fill color.
:param border: Optional border value. If given, the region consists of
pixels with a color different from the border color. If not given,
the region consists of pixels having the same color as the seed
pixel.
:param thresh: Optional threshold value which specifies a maximum
tolerable difference of a pixel value from the 'background' in
order for it to be replaced. Useful for filling regions of
non-homogeneous, but similar, colors.
"""
# based on an implementation by Eric S. Raymond
# amended by yo1995 @20180806
pixel = image.load()
x, y = xy
try:
background = pixel[x, y]
if _color_diff(value, background) <= thresh:
return set() # seed point already has fill color
pixel[x, y] = value
except (ValueError, IndexError):
return set() # seed point outside image
edge = {(x, y)}
# use a set to keep record of current and previous edge pixels
# to reduce memory consumption
filled_pixels = set()
full_edge = set()
while edge:
filled_pixels.update(edge)
new_edge = set()
for (x, y) in edge: # 4 adjacent method
for (s, t) in ((x + 1, y), (x - 1, y), (x, y + 1), (x, y - 1)):
# If already processed, or if a coordinate is negative, skip
if (s, t) in full_edge or s < 0 or t < 0:
continue
try:
p = pixel[s, t]
except (ValueError, IndexError):
pass
else:
full_edge.add((s, t))
if border is None:
fill = _color_diff(p, background) <= thresh
else:
fill = p not in [value, border]
if fill:
pixel[s, t] = value
new_edge.add((s, t))
full_edge = edge # discard pixels processed
edge = new_edge
return filled_pixels # Modified to returned the filled pixels
def create_number_mask(regions, filepath, filename):
base_img_path = filepath / filename
if not base_img_path.exists():
return False
base_img: Image.Image = Image.open(base_img_path)
number_img = Image.new("L", base_img.size, 255)
background_img = Image.new("L", base_img.size, 255)
number2_img = Image.new("L", base_img.size, 255)
if MAP_FONT is None:
fnt = ImageFont.load_default()
else:
fnt = MAP_FONT
d = ImageDraw.Draw(number_img)
d2 = ImageDraw.Draw(background_img)
d3 = ImageDraw.Draw(number2_img)
for region_num, region in regions.items():
text = getattr(region, "name", str(region_num))
w1, h1 = region.center
w2, h2 = fnt.getsize(text)
d2.rectangle(
(w1 - (w2 / 2) - 1, h1 - (h2 / 2) + 5, w1 + (w2 / 2) - 1, h1 + (h2 / 2)), fill=0
)
d3.rectangle(
(w1 - (w2 / 2) - 1, h1 - (h2 / 2) + 5, w1 + (w2 / 2) - 1, h1 + (h2 / 2)), fill=0
)
d3.text((w1 - (w2 / 2), h1 - (h2 / 2)), text, font=fnt, fill=255)
d.text((w1 - (w2 / 2), h1 - (h2 / 2)), text, font=fnt, fill=0)
number_img.save(filepath / "numbers.png", "PNG")
background_img.save(filepath / "numbers_background.png", "PNG")
number2_img.save(filepath / "numbers2.png", "PNG")
return True
class ConquestMap:
def __init__(self, path: pathlib.Path):
self.path = path
self.name = None
self.custom = None
self.region_max = None
self.regions = {}
def masks_path(self):
return self.path / "masks"
def data_path(self):
return self.path / "data.json"
def blank_path(self):
return self.path / "blank.png" # Everything is png now
def numbers_path(self):
return self.path / "numbers.png"
def numbered_path(self):
return self.path / "numbered.png"
def numbers_background_path(self):
return self.path / "numbers_background.png"
def numbers2_path(self):
return self.path / "numbers2.png"
def load_data(self):
with self.data_path().open() as dp:
data = json.load(dp)
self.name = data.get("name")
self.custom = data.get("custom")
self.region_max = data.get("region_max")
if "regions" in data:
self.regions = {int(key): Region(**data) for key, data in data["regions"].items()}
else:
self.regions = {}
async def create_number_mask(self):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
None, create_number_mask, self.regions, self.path, "blank.png"
)
def _img_combine_masks(self, mask_list: List[int]):
if not mask_list or len(mask_list) < 2:
return False, None, None
if not self.blank_path().exists():
return False, None, None
if not self.masks_path().exists():
return False, None, None
base_img: Image.Image = Image.open(self.blank_path())
mask = Image.new(MASK_MODE, base_img.size, 1)
lowest_num = None
eliminated_masks = []
for mask_num in mask_list:
if lowest_num is None:
lowest_num = mask_num
elif mask_num < lowest_num:
eliminated_masks.append(lowest_num)
lowest_num = mask_num
else:
eliminated_masks.append(mask_num)
mask2 = Image.open(self.masks_path() / f"{mask_num}.png").convert(MASK_MODE)
mask = ImageChops.logical_and(mask, mask2)
return lowest_num, eliminated_masks, mask
async def sample_region(self, region: int, include_numbered=False):
if region not in self.regions:
return []
files = []
current_map = Image.open(self.blank_path())
current_map = await composite_regions(
current_map, [region], ImageColor.getrgb("red"), self.masks_path()
)
buffer1 = BytesIO()
current_map.save(buffer1, "png")
buffer1.seek(0)
files.append(buffer1)
if include_numbered:
current_numbered_img = await self.get_numbered(current_map)
buffer2 = BytesIO()
current_numbered_img.save(buffer2, "png")
buffer2.seek(0)
files.append(buffer2)
return files
async def get_sample(self, region=None):
files = [self.blank_path()]
masks_dir = self.masks_path()
if masks_dir.exists() and masks_dir.is_dir():
current_map = Image.open(self.blank_path())
if region is not None:
if region in self.regions:
current_map = await composite_regions(
current_map, [region], ImageColor.getrgb("red"), self.masks_path()
)
else:
regions = list(self.regions.keys())
random.shuffle(regions) # random lets goo
fourth = len(regions) // 4
current_map = await composite_regions(
current_map, regions[:fourth], ImageColor.getrgb("red"), self.masks_path()
)
current_map = await composite_regions(
current_map,
regions[fourth : fourth * 2],
ImageColor.getrgb("green"),
self.masks_path(),
)
current_map = await composite_regions(
current_map,
regions[fourth * 2 : fourth * 3],
ImageColor.getrgb("blue"),
self.masks_path(),
)
current_map = await composite_regions(
current_map,
regions[fourth * 3 :],
ImageColor.getrgb("yellow"),
self.masks_path(),
)
current_numbered_img = await self.get_numbered(current_map)
buffer1 = BytesIO()
buffer2 = BytesIO()
current_map.save(buffer1, "png")
buffer1.seek(0)
current_numbered_img.save(buffer2, "png")
buffer2.seek(0)
files.append(buffer1)
files.append(buffer2)
return files
async def get_blank_numbered_file(self):
im = await self.get_numbered(Image.open(self.blank_path()))
buffer1 = BytesIO()
im.save(buffer1, "png")
buffer1.seek(0)
return buffer1
async def get_numbered(self, current_map):
# return await self.get_inverted_numbered(current_map)
return await self.get_numbered_with_background(current_map)
async def get_inverted_numbered(self, current_map):
loop = asyncio.get_running_loop()
numbers = Image.open(self.numbers_path()).convert("L")
inverted_map = ImageOps.invert(current_map)
current_numbered_img = await loop.run_in_executor(
None, Image.composite, current_map, inverted_map, numbers
)
return current_numbered_img
async def get_numbered_with_background(self, current_map):
loop = asyncio.get_running_loop()
current_map = current_map.convert("RGBA")
# numbers = Image.open(self.numbers_path()).convert("L")
numbers_mask = Image.open(self.numbers_background_path()).convert("L")
numbers_background = Image.open(self.numbers2_path()).convert("RGB")
# inverted_map = ImageOps.invert(current_map)
# current_numbered_img = await loop.run_in_executor(
# None, Image.composite, current_map, inverted_map, numbers
# )
current_numbered_img = await loop.run_in_executor(
None, Image.composite, current_map, numbers_background, numbers_mask
)
return current_numbered_img
class MapMaker(ConquestMap):
async def change_name(self, new_name: str, new_path: pathlib.Path):
if new_path.exists() and new_path.is_dir():
# This is an overwrite operation
# await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?")
#
# pred = MessagePredicate.yes_or_no(ctx)
# try:
# await self.bot.wait_for("message", check=pred, timeout=30)
# except TimeoutError:
# await ctx.maybe_send_embed("Response timed out, cancelling save")
# return
# if not pred.result:
# return
return False, "Overwrite currently not supported"
# This is a new name
new_path.mkdir()
shutil.copytree(self.path, new_path)
self.custom = True # If this wasn't a custom map, it is now
self.name = new_name
self.path = new_path
await self.save_data()
return True
async def generate_masks(self):
regioner = Regioner(filename="blank.png", filepath=self.path)
loop = asyncio.get_running_loop()
regions = await loop.run_in_executor(None, regioner.execute)
if not regions:
return regions
self.regions = regions
self.region_max = len(regions) + 1
await self.save_data()
return regions
async def combine_masks(self, mask_list: List[int]):
loop = asyncio.get_running_loop()
lowest, eliminated, mask = await loop.run_in_executor(
None, self._img_combine_masks, mask_list
)
if not lowest:
return lowest
try:
elim_regions = [self.regions[n] for n in eliminated]
lowest_region = self.regions[lowest]
except KeyError:
return False
mask.save(self.masks_path() / f"{lowest}.png", "PNG")
# points = [self.mm["regions"][f"{n}"]["center"] for n in mask_list]
#
# points = [(r.center, r.weight) for r in elim_regions]
weighted_points = [r.center for r in elim_regions for _ in range(r.weight)] + [
lowest_region.center for _ in range(lowest_region.weight)
]
lowest_region.center = get_center(weighted_points)
lowest_region.weight += sum(r.weight for r in elim_regions)
for key in eliminated:
self.regions.pop(key)
# self.mm["regions"].pop(f"{key}")
if self.region_max in eliminated: # Max region has changed
self.region_max = max(self.regions.keys())
await self.create_number_mask()
await self.save_data()
return lowest
async def delete_masks(self, mask_list):
try:
for key in mask_list:
self.regions.pop(key)
# self.mm["regions"].pop(f"{key}")
except KeyError:
return False
if self.region_max in mask_list: # Max region has changed
self.region_max = max(self.regions.keys())
await self.create_number_mask()
await self.save_data()
return mask_list
async def save_data(self):
to_save = {
"name": self.name,
"custom": self.custom,
"region_max": self.region_max,
"regions": {num: r.get_json() for num, r in self.regions.items()},
}
with self.data_path().open("w+") as dp:
json.dump(to_save, dp, sort_keys=True, indent=4)
async def init_directory(self, name: str, path: pathlib.Path, image: Image.Image):
if not path.exists() or not path.is_dir():
path.mkdir()
self.name = name
self.path = path
await self.save_data()
image.save(self.blank_path(), "PNG")
return True
async def recalculate_region(self, regions=None):
# TODO: Refactor
if regions is None:
async for num, r in AsyncIter(self.regions.items()):
points = await self.get_points_from_mask(num)
r.center = get_center(points)
r.weight = len(points)
else:
async for region in AsyncIter(regions):
num = region
r = self.regions[num]
points = await self.get_points_from_mask(region)
r.center = get_center(points)
r.weight = len(points)
await self.save_data()
async def sort_regions(self, fast_sort=True):
if fast_sort: # Topmost, then leftmost
regions = []
async for num in AsyncIter(self.regions.keys()):
points = await self.get_points_from_mask(num)
points = list(points)
points.sort(key=lambda x: x[1])
regions.append((points[0], num))
regions.sort(key=lambda x: x[0][1])
else: # Chunked approach from Regioner.execute (test that first)
raise NotImplementedError
# Rename all masks to mask_old
async for num in AsyncIter(self.regions.keys()):
old_mask = self.masks_path() / f"{num}.png"
new_mask = self.masks_path() / f"{num}_old.png"
old_mask.rename(new_mask)
# Rename all _old masks to their new num, and make the new dictionary of data
new_regions = {}
async for new_num, old_num in AsyncIter(enumerate((r[1] for r in regions), start=1)):
old_mask = self.masks_path() / f"{old_num}_old.png"
new_mask = self.masks_path() / f"{new_num}.png"
old_mask.rename(new_mask)
new_regions[new_num] = self.regions[old_num]
# Save the new dictionary to regions
self.regions = new_regions
await self.save_data()
async def get_points_from_mask(self, region):
mask: Image.Image = Image.open(self.masks_path() / f"{region}.png").convert(MASK_MODE)
arr = numpy.array(mask)
found = numpy.where(arr == 0)
points = set(list(zip(found[1], found[0]))) # x then y I think?
return points
async def convert_masks(self, regions):
if regions:
async for region in regions:
mask_path = self.masks_path() / f"{region}.png"
img: Image.Image = Image.open(mask_path).convert(MASK_MODE)
img.save(mask_path)
else:
async for mask_path in AsyncIter(self.masks_path().iterdir()):
# Don't both checking if masks are in self.regions
img: Image.Image = Image.open(mask_path).convert(MASK_MODE)
img.save(mask_path, "PNG")
return True
async def prune_masks(self):
"""Two step process:
1. Delete all mask images that aren't in self.regions
2. Iterate through regions numerically, renaming all mask images to that number
All so 1 3 4 doesn't cause 4->3 to overwrite 3->2"""
pruned = []
# Step 1
async for mask in AsyncIter(self.masks_path().iterdir(), steps=5):
if int(mask.stem) not in self.regions:
mask.unlink()
pruned.append(mask.stem)
# Step 2
new_regions = {}
async for newnum, (num, data) in AsyncIter(
enumerate(self.regions.items(), start=1), steps=5
):
new_regions[newnum] = data
if newnum == num:
continue
old_mask = self.masks_path() / f"{num}.png"
new_mask = self.masks_path() / f"{newnum}.png"
old_mask.rename(new_mask)
self.regions = new_regions
self.region_max = max(self.regions.keys()) # I could use len() here, but max to be safe
await self.save_data()
return pruned
class Region:
def __init__(self, center, weight, **kwargs):
self.center = center
self.weight = weight
self.data = kwargs
def get_json(self):
return {"center": self.center, "weight": self.weight, **self.data}
class Regioner:
def __init__(
self, filepath: pathlib.Path, filename: str, region_color=None, wall_color="black"
):
self.filepath = filepath
self.filename = filename
self.wall_color = ImageColor.getcolor(wall_color, "L")
if region_color is None:
self.region_color = None
else:
self.region_color = ImageColor.getcolor(region_color, "L")
def execute(self):
"""
Create the regions of the map
TODO: Using proper multithreading best practices.
TODO: This is iterating over a 2d array with some overlap, you went to school for this Bozo
TODO: Fails on some maps where borders aren't just black (i.e. water borders vs region borders)
"""
base_img_path = self.filepath / self.filename
if not base_img_path.exists():
return False
masks_path = self.filepath / "masks"
if not masks_path.exists():
masks_path.mkdir()
black = ImageColor.getcolor("black", "L")
white = ImageColor.getcolor("white", "L")
base_img: Image.Image = Image.open(base_img_path).convert("L")
already_processed = set()
mask_count = 0
regions = {}
for y_chunk in chunker(range(base_img.height), base_img.height // 10):
for y1 in y_chunk:
for x_chunk in chunker(range(base_img.width), base_img.width // 10):
for x1 in x_chunk:
if (x1, y1) in already_processed:
continue
if (
self.region_color is None and base_img.getpixel((x1, y1)) != self.wall_color
) or base_img.getpixel((x1, y1)) == self.region_color:
filled = floodfill(base_img, (x1, y1), self.wall_color, self.wall_color)
if filled: # Pixels were updated, make them into a mask
mask = Image.new(MASK_MODE, base_img.size, 255)
for x2, y2 in filled:
mask.putpixel((x2, y2), 0) # TODO: Switch to ImageDraw
mask_count += 1
# mask = mask.convert(MASK_MODE) # I don't think this does anything
mask.save(masks_path / f"{mask_count}.png", "PNG")
regions[mask_count] = Region(center=get_center(filled), weight=len(filled))
already_processed.update(filled)
create_number_mask(regions, self.filepath, self.filename)
return regions