You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Fox-V3/conquest/conquest.py

658 lines
23 KiB

import asyncio
import json
import pathlib
from io import BytesIO
from shutil import copyfile
from typing import Optional, Union
import discord
from PIL import Image, ImageColor, ImageOps
from discord.ext.commands import Greedy
from redbot.core import Config, commands
from redbot.core.bot import Red
from redbot.core.data_manager import bundled_data_path, cog_data_path
from redbot.core.utils.predicates import MessagePredicate
from conquest.regioner import ConquestMap, composite_regions
class Conquest(commands.Cog):
"""
Cog for creating and modifying maps for RPGs and War Games
"""
default_zoom_json = {"enabled": False, "x": -1, "y": -1, "zoom": 1.0}
default_custom_map = {
"name": "",
"regions": {},
"region_max": 0,
"extension": "png",
"custom": True,
}
default_maps_json = {
"maps": []
}
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
self.config = Config.get_conf(
self, identifier=67111110113117101115116, force_registration=True
)
default_guild = {}
default_global = {"current_map": None, "is_custom": False}
self.config.register_guild(**default_guild)
self.config.register_global(**default_global)
self.data_path: pathlib.Path = cog_data_path(self)
5 years ago
self.custom_map_path = self.data_path / "custom_maps"
if not self.custom_map_path.exists() or not self.custom_map_path.is_dir():
self.custom_map_path.mkdir()
with (self.custom_map_path / "maps.json").open("w+") as dj:
json.dump(self.default_maps_json.copy(), dj, sort_keys=True, indent=4)
5 years ago
self.current_map_folder = self.data_path / "current_maps"
if not self.current_map_folder.exists() or not self.current_map_folder.is_dir():
self.current_map_folder.mkdir()
5 years ago
self.asset_path: Optional[pathlib.Path] = None
self.is_custom = False
self.current_map = None
self.map_data = None
self.ext = None
self.ext_format = None
self.mm: Union[ConquestMap, None] = None
async def red_delete_data_for_user(self, **kwargs):
"""Nothing to delete"""
return
def _path_if_custom(self, custom_custom: bool = None) -> pathlib.Path:
check_value = custom_custom if custom_custom is not None else self.is_custom
if check_value:
return self.custom_map_path
return self.asset_path
async def load_data(self):
"""
Initial loading of data from bundled_data_path and config
"""
self.asset_path = bundled_data_path(self) / "assets"
self.current_map = await self.config.current_map()
self.is_custom = await self.config.is_custom()
if self.current_map:
await self.current_map_load()
async def current_map_load(self):
map_path = self._path_if_custom()
map_data_path = map_path / self.current_map / "data.json"
try:
with map_data_path.open() as mapdata:
self.map_data: dict = json.load(mapdata)
except FileNotFoundError as e:
print(e)
await self.config.current_map.set(None)
return
self.ext = self.map_data["extension"]
self.ext_format = "JPEG" if self.ext.upper() == "JPG" else self.ext.upper()
5 years ago
async def _get_current_map_path(self):
return self.current_map_folder / self.current_map
5 years ago
async def _create_zoomed_map(self, map_path, x, y, zoom, **kwargs):
current_map = Image.open(map_path)
w, h = current_map.size
zoom2 = zoom * 2
zoomed_map = current_map.crop((x - w / zoom2, y - h / zoom2, x + w / zoom2, y + h / zoom2))
# zoomed_map = zoomed_map.resize((w, h), Image.LANCZOS)
current_map_path_ = await self._get_current_map_path()
zoomed_map.save(current_map_path_ / f"zoomed.{self.ext}", self.ext_format)
return current_map_path_ / f"zoomed.{self.ext}"
async def _send_maybe_zoomed_map(self, ctx, map_path, filename):
zoom_data = {"enabled": False}
zoom_json_path = await self._get_current_map_path() / "settings.json"
if zoom_json_path.exists():
with zoom_json_path.open() as zoom_json:
zoom_data = json.load(zoom_json)
if zoom_data["enabled"]:
map_path = await self._create_zoomed_map(map_path, **zoom_data)
await ctx.send(file=discord.File(fp=map_path, filename=filename))
async def _process_take_regions(self, color, ctx, regions):
current_img_path = await self._get_current_map_path() / f"current.{self.ext}"
im = Image.open(current_img_path)
async with ctx.typing():
out: Image.Image = await composite_regions(
im, regions, color, self._path_if_custom() / self.current_map
)
5 years ago
out.save(current_img_path, self.ext_format)
await self._send_maybe_zoomed_map(ctx, current_img_path, f"map.{self.ext}")
async def _mm_save_map(self, map_name, target_save):
return await self.mm.change_name(map_name, target_save)
@commands.group()
async def mapmaker(self, ctx: commands.context):
"""
Base command for managing current maps or creating new ones
"""
if ctx.invoked_subcommand is None:
pass
@mapmaker.command(name="close")
async def _mapmaker_close(self, ctx: commands.Context):
"""Close the currently open map."""
self.mm = None
await ctx.tick()
@mapmaker.command(name="save")
async def _mapmaker_save(self, ctx: commands.Context, *, map_name: str):
"""Save the current map to the specified map name"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if self.mm.name == map_name:
await ctx.maybe_send_embed("This map already has that name, no reason to save")
return
target_save = self.custom_map_path / map_name
result = await self._mm_save_map(map_name, target_save)
if not result:
await ctx.maybe_send_embed("Failed to save to that name")
else:
await ctx.maybe_send_embed(f"Map successfully saved to {target_save}")
@mapmaker.command(name="upload")
async def _mapmaker_upload(self, ctx: commands.Context, map_name: str, map_path=""):
"""Load a map image to be modified. Upload one with this command or provide a path"""
message: discord.Message = ctx.message
if not message.attachments and not map_path:
await ctx.maybe_send_embed(
"Either upload an image with this command or provide a path to the image"
)
return
target_save = self.custom_map_path / map_name
if target_save.exists() and target_save.is_dir():
await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?")
pred = MessagePredicate.yes_or_no(ctx)
try:
await self.bot.wait_for("message", check=pred, timeout=30)
except TimeoutError:
await ctx.maybe_send_embed("Response timed out, cancelling save")
return
if not pred.result:
return
if not self.mm:
self.mm = ConquestMap(self.custom_map_path)
self.mm.custom = True
if map_path:
map_path = pathlib.Path(map_path)
if not map_path.exists():
await ctx.maybe_send_embed("Map not found at that path")
return
mm_img = Image.open(map_path)
elif message.attachments:
attch: discord.Attachment = message.attachments[0]
# attch_file = await attch.to_file()
buffer = BytesIO()
await attch.save(buffer)
mm_img: Image.Image = Image.open(buffer)
else:
# Wait what?
return
result = await self.mm.init_directory(map_name, target_save, mm_img)
if not result:
self.mm = None
await ctx.maybe_send_embed("Failed to upload to that name")
return
maps_json_path = self.custom_map_path / "maps.json"
with maps_json_path.open("r+") as maps:
map_data = json.load(maps)
map_data["maps"].append(map_name)
maps.seek(0)
json.dump(map_data, maps, sort_keys=True, indent=4)
await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}")
@mapmaker.command(name="sample")
async def _mapmaker_sample(self, ctx: commands.Context):
"""Print the currently being modified map as a sample"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
async with ctx.typing():
files = await self.mm.get_sample()
for f in files:
await ctx.send(file=discord.File(f))
@mapmaker.command(name="load")
async def _mapmaker_load(self, ctx: commands.Context, map_name: str):
"""Load an existing map to be modified."""
if self.mm:
await ctx.maybe_send_embed(
"There is a current map in progress. Close it first with `[p]mapmaker close`"
)
return
map_path = self.custom_map_path / map_name
if not map_path.exists() or not map_path.is_dir():
await ctx.maybe_send_embed(f"Map {map_name} not found in {self.custom_map_path}")
return
self.mm = ConquestMap(map_path)
await self.mm.load_data()
await ctx.tick()
@mapmaker.group(name="masks")
async def _mapmaker_masks(self, ctx: commands.Context):
"""Base command for managing map masks"""
if ctx.invoked_subcommand is None:
pass
@_mapmaker_masks.command(name="generate")
async def _mapmaker_masks_generate(self, ctx: commands.Context):
"""
Generate masks for the map
Currently only works on maps with black borders and white regions.
Non-white regions are ignored (i.e. blue water)
"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
masks_dir = self.mm.masks_path()
if masks_dir.exists() and masks_dir.is_dir():
await ctx.maybe_send_embed("Mask folder already exists, delete this before continuing")
return
with ctx.typing():
regions = await self.mm.generate_masks()
if not regions:
await ctx.maybe_send_embed("Failed to generate masks")
return
await ctx.maybe_send_embed(f"{len(regions)} masks generated into {masks_dir}")
@_mapmaker_masks.command(name="combine")
async def _mapmaker_masks_combine(
self, ctx: commands.Context, mask_list: Greedy[int], recommended=False
):
"""Generate masks for the map"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if recommended and mask_list:
await ctx.maybe_send_embed(
"Can't combine recommend masks and a mask list at the same time, pick one"
)
return
masks_dir = self.mm.masks_path()
if not masks_dir.exists() or not masks_dir.is_dir():
await ctx.maybe_send_embed("There are no masks")
return
if not recommended:
for mask in mask_list: # TODO: switch to self.mm.regions intersection of sets
m = masks_dir / f"{mask}.png"
if not m.exists():
await ctx.maybe_send_embed(f"Mask #{mask} does not exist")
return
else:
await ctx.send("Not Implemented")
return
result = await self.mm.combine_masks(mask_list)
if not result:
await ctx.maybe_send_embed(
"Failed to combine masks, try the command again or check log for errors"
)
return
await ctx.tick()
@commands.group()
async def conquest(self, ctx: commands.Context):
"""
Base command for conquest cog. Start with `[p]conquest set map` to select a map.
"""
if ctx.invoked_subcommand is None:
if self.current_map is not None:
await self._conquest_current(ctx)
@conquest.command(name="list")
async def _conquest_list(self, ctx: commands.Context):
"""
List currently available maps
"""
maps_json = self.asset_path / "maps.json"
with maps_json.open() as maps:
maps_json = json.load(maps)
map_list = maps_json["maps"]
maps_json = self.custom_map_path / "maps.json"
if maps_json.exists():
with maps_json.open() as maps:
maps_json = json.load(maps)
custom_map_list = maps_json["maps"]
map_list = "\n".join(map_list)
custom_map_list = "\n".join(custom_map_list)
await ctx.maybe_send_embed(f"Current maps:\n{map_list}\n\nCustom maps:\n{custom_map_list}")
@conquest.group(name="set")
async def conquest_set(self, ctx: commands.Context):
"""Base command for admin actions like selecting a map"""
if ctx.invoked_subcommand is None:
pass
@conquest_set.command(name="resetzoom")
async def _conquest_set_resetzoom(self, ctx: commands.Context):
"""Resets the zoom level of the current map"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
5 years ago
zoom_json_path = await self._get_current_map_path() / "settings.json"
if not zoom_json_path.exists():
await ctx.maybe_send_embed(
f"No zoom data found for {self.current_map}, reset not needed"
)
return
with zoom_json_path.open("w+") as zoom_json:
json.dump({"enabled": False}, zoom_json, sort_keys=True, indent=4)
await ctx.tick()
@conquest_set.command(name="zoom")
async def _conquest_set_zoom(self, ctx: commands.Context, x: int, y: int, zoom: float):
"""
Set the zoom level and position of the current map
x: positive integer
y: positive integer
zoom: float greater than or equal to 1
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
if x < 0 or y < 0 or zoom < 1:
await ctx.send_help()
return
5 years ago
zoom_json_path = await self._get_current_map_path() / "settings.json"
zoom_data = self.default_zoom_json.copy()
zoom_data["enabled"] = True
zoom_data["x"] = x
zoom_data["y"] = y
zoom_data["zoom"] = zoom
with zoom_json_path.open("w+") as zoom_json:
json.dump(zoom_data, zoom_json, sort_keys=True, indent=4)
await ctx.tick()
@conquest_set.command(name="zoomtest")
async def _conquest_set_zoomtest(self, ctx: commands.Context, x: int, y: int, zoom: float):
"""
Test the zoom level and position of the current map
x: positive integer
y: positive integer
zoom: float greater than or equal to 1
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
if x < 0 or y < 0 or zoom < 1:
await ctx.send_help()
return
zoomed_path = await self._create_zoomed_map(
5 years ago
await self._get_current_map_path() / f"current.{self.ext}", x, y, zoom
)
await ctx.send(file=discord.File(fp=zoomed_path, filename=f"current_zoomed.{self.ext}",))
@conquest_set.command(name="save")
async def _conquest_set_save(self, ctx: commands.Context, *, save_name):
"""Save the current map to be loaded later"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
5 years ago
current_map_folder = await self._get_current_map_path()
current_map = current_map_folder / f"current.{self.ext}"
if not current_map_folder.exists() or not current_map.exists():
await ctx.maybe_send_embed("Current map doesn't exist! Try setting a new one")
return
copyfile(current_map, current_map_folder / f"{save_name}.{self.ext}")
await ctx.tick()
@conquest_set.command(name="load")
async def _conquest_set_load(self, ctx: commands.Context, *, save_name):
"""Load a saved map to be the current map"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
5 years ago
current_map_folder = await self._get_current_map_path()
current_map = current_map_folder / f"current.{self.ext}"
saved_map = current_map_folder / f"{save_name}.{self.ext}"
if not current_map_folder.exists() or not saved_map.exists():
await ctx.maybe_send_embed(f"Saved map not found in the {self.current_map} folder")
return
copyfile(saved_map, current_map)
await ctx.tick()
@conquest_set.command(name="map")
async def _conquest_set_map(
self, ctx: commands.Context, mapname: str, is_custom: bool = False, reset: bool = False
):
"""
Select a map from current available maps
To add more maps, see the guide (WIP)
"""
check_path = self._path_if_custom(is_custom)
map_dir = check_path / mapname
if not map_dir.exists() or not map_dir.is_dir():
await ctx.maybe_send_embed(
f"Map `{mapname}` was not found in the {check_path} directory"
)
return
self.current_map = mapname
self.is_custom = is_custom
await self.config.current_map.set(self.current_map) # Save to config too
await self.config.is_custom.set(is_custom)
await self.current_map_load()
5 years ago
current_map_folder = await self._get_current_map_path()
current_map = current_map_folder / f"current.{self.ext}"
if not reset and current_map.exists():
await ctx.maybe_send_embed(
"This map is already in progress, resuming from last game\n"
"Use `[p]conquest set map [mapname] True` to start a new game"
)
else:
if not current_map_folder.exists():
current_map_folder.mkdir()
copyfile(check_path / mapname / f"blank.{self.ext}", current_map)
await ctx.tick()
@conquest.command(name="current")
async def _conquest_current(self, ctx: commands.Context):
"""
Send the current map.
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
5 years ago
current_img = await self._get_current_map_path() / f"current.{self.ext}"
await self._send_maybe_zoomed_map(ctx, current_img, f"current_map.{self.ext}")
@conquest.command("blank")
async def _conquest_blank(self, ctx: commands.Context):
"""
Print the blank version of the current map, for reference.
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
current_blank_img = self._path_if_custom() / self.current_map / f"blank.{self.ext}"
await self._send_maybe_zoomed_map(ctx, current_blank_img, f"blank_map.{self.ext}")
@conquest.command("numbered")
async def _conquest_numbered(self, ctx: commands.Context):
"""
Print the numbered version of the current map, for reference.
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
numbers_path = self._path_if_custom() / self.current_map / f"numbers.{self.ext}"
if not numbers_path.exists():
await ctx.send(
file=discord.File(
fp=self._path_if_custom() / self.current_map / f"numbered.{self.ext}",
filename=f"numbered.{self.ext}",
)
)
return
current_map_path = await self._get_current_map_path()
5 years ago
current_map = Image.open(current_map_path / f"current.{self.ext}")
numbers = Image.open(numbers_path).convert("L")
inverted_map = ImageOps.invert(current_map)
loop = asyncio.get_running_loop()
current_numbered_img = await loop.run_in_executor(
None, Image.composite, current_map, inverted_map, numbers
)
current_numbered_img.save(
5 years ago
current_map_path / f"current_numbered.{self.ext}", self.ext_format
)
await self._send_maybe_zoomed_map(
5 years ago
ctx, current_map_path / f"current_numbered.{self.ext}", f"current_numbered.{self.ext}",
)
@conquest.command(name="multitake")
async def _conquest_multitake(
self, ctx: commands.Context, start_region: int, end_region: int, color: str
):
"""
Claim all the territories between the two provided region numbers (inclusive)
:param start_region:
"""
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
try:
color = ImageColor.getrgb(color)
except ValueError:
await ctx.maybe_send_embed(f"Invalid color {color}")
return
if end_region > self.map_data["region_max"] or start_region < 1:
await ctx.maybe_send_embed(
f"Max region number is {self.map_data['region_max']}, minimum is 1"
)
return
if start_region < end_region:
start_region, end_region = end_region, start_region
regions = [r for r in range(start_region, end_region + 1)]
await self._process_take_regions(color, ctx, regions)
@conquest.command(name="take")
async def _conquest_take(self, ctx: commands.Context, regions: Greedy[int], *, color: str):
"""
Claim a territory or list of territories for a specified color
:param regions: List of integer regions
:param color: Color to claim regions
"""
if not regions:
await ctx.send_help()
return
if self.current_map is None:
await ctx.maybe_send_embed("No map is currently set. See `[p]conquest set map`")
return
try:
color = ImageColor.getrgb(color)
except ValueError:
await ctx.maybe_send_embed(f"Invalid color {color}")
return
for region in regions:
if region > self.map_data["region_max"] or region < 1:
await ctx.maybe_send_embed(
f"Max region number is {self.map_data['region_max']}, minimum is 1"
)
return
await self._process_take_regions(color, ctx, regions)