You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Fox-V3/conquest/conquest.py

790 lines
28 KiB

import asyncio
import json
import logging
import pathlib
from collections import defaultdict
from io import BytesIO
from shutil import copyfile
from typing import Optional, Union, Dict
import discord
from PIL import Image, ImageColor, ImageOps, ImageFont
from discord.ext.commands import Greedy
from redbot.core import Config, commands
from redbot.core.commands import Context
from redbot.core.bot import Red
from redbot.core.data_manager import bundled_data_path, cog_data_path
from redbot.core.utils.predicates import MessagePredicate
from conquest import regioner
from conquest.conquestgame import ConquestGame
ERROR_CONQUEST_SET_MAP = "No map is currently set. See `[p]conquest set map`"
log = logging.getLogger("red.fox_v3.conquest")
class Conquest(commands.Cog):
"""
Create and modify maps for RPGs and War Games
"""
default_maps_json = {"maps": []}
# Usage: await self.config.games.get_raw("game_name", "is_custom")
default_games = {"map_name": None, "is_custom": False}
ext = "PNG"
ext_format = "PNG"
def __init__(self, bot: Red):
super().__init__()
self.bot = bot
self.config = Config.get_conf(
self, identifier=67_111_110_113_117_101_115_116, force_registration=True
)
default_guild = {"current_game": None}
default_global = {"games": {}}
self.config.register_guild(**default_guild)
self.config.register_global(**default_global)
self.data_path: pathlib.Path = cog_data_path(self)
self.custom_map_folder = self.data_path / "custom_maps"
if not self.custom_map_folder.exists() or not self.custom_map_folder.is_dir():
self.custom_map_folder.mkdir()
with (self.custom_map_folder / "maps.json").open("w+") as dj:
json.dump(self.default_maps_json.copy(), dj, sort_keys=True, indent=4)
self.current_map_folder = self.data_path / "current_maps"
if not self.current_map_folder.exists() or not self.current_map_folder.is_dir():
self.current_map_folder.mkdir()
self.asset_path: Optional[pathlib.Path] = None
self.current_games: Dict[int, Optional[ConquestGame]] = defaultdict(
lambda: None
) # key: guild_id
self.map_data = {} # key, value = guild.id, ConquestGame
self.mm: Optional[regioner.MapMaker] = None
async def red_delete_data_for_user(self, **kwargs):
"""Nothing to delete"""
return
def _path_if_custom(self, custom_custom: bool = None) -> pathlib.Path:
check_value = custom_custom # if custom_custom is not None else self.is_custom
if check_value:
return self.custom_map_folder
return self.asset_path
async def load_data(self):
"""
Initial loading of data from bundled_data_path and config
"""
self.asset_path = bundled_data_path(self) / "assets"
for guild in self.bot.guilds:
game_data = await self.config.guild(guild).current_game()
if game_data is not None:
await self.load_guild_data(guild, **game_data)
# regioner.MAP_FONT = ImageFont.truetype(
# str(bundled_data_path(self) / "fonts" / "smallest_pixel_7" / "smallest_pixel-7.ttf"), size=10
# )
# regioner.MAP_FONT = ImageFont.truetype(
# str(bundled_data_path(self) / "fonts" / "bit01" / "bit01.ttf"), size=4
# )
regioner.MAP_FONT = ImageFont.truetype(
str(bundled_data_path(self) / "fonts" / "pixels" / "Pixels.ttf"), size=16
)
# for guild_id, game_name in self.current_maps.items():
# await self.current_map_load(guild_id, game_name)
async def load_guild_data(self, guild: discord.Guild, map_name: str, is_custom: bool):
# map_data = await self.config.games.get_raw(game_name)
# if map_data is None:
# return False
# map_name = map_data["map_name"]
map_path = self._path_if_custom(is_custom) / map_name
if (
not (self.current_map_folder / str(guild.id)).exists()
or not (self.current_map_folder / str(guild.id) / map_name).exists()
):
return False
self.current_games[guild.id] = ConquestGame(
map_path, map_name, self.current_map_folder / str(guild.id) / map_name
)
return True
# async def current_map_load(self):
# map_path = self._path_if_custom()
# self.map_data = ConquestMap(map_path / self.current_map)
# await self.map_data.load_data()
# # map_data_path = map_path / self.current_map / "data.json"
# # try:
# # with map_data_path.open() as mapdata:
# # self.map_data: dict = json.load(mapdata)
# # except FileNotFoundError as e:
# # print(e)
# # await self.config.current_map.set(None)
# # return
# async def _get_current_map_folder(self, guild):
# return self.current_map_folder / guild.id / self.current_map
async def _mm_save_map(self, map_name, target_save):
return await self.mm.change_name(map_name, target_save)
@commands.is_owner()
@commands.group()
async def mapmaker(self, ctx: Context):
"""
Base command for managing current maps or creating new ones
"""
if ctx.invoked_subcommand is None:
pass
@mapmaker.command(name="numbers")
async def _mapmaker_numbers(self, ctx: Context):
"""Regenerates the number mask and puts it in the channel"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
async with ctx.typing():
await self.mm.create_number_mask()
im = await self.mm.get_blank_numbered_file()
await ctx.send(file=discord.File(im, filename="map.png"))
@mapmaker.command(name="close")
async def _mapmaker_close(self, ctx: Context):
"""Close the currently open map."""
self.mm = None
await ctx.tick()
@mapmaker.group(name="debug")
async def _mapmaker_debug(self, ctx: Context):
"""Debug commands for making maps. Don't use unless directed to."""
pass
@_mapmaker_debug.command(name="recalculate")
async def _mapmaker_debug_recalculate(self, ctx: Context, mask_list: Greedy[int]):
"""Recaculate the center point and weight for the given region.
Processes all regions if region isn't specified."""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
async with ctx.typing():
await self.mm.recalculate_region(mask_list)
await ctx.tick()
@_mapmaker_debug.command(name="sort")
async def _mapmaker_debug_sort(self, ctx: Context):
"""Sorts the regions by position on the map."""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
async with ctx.typing():
await self.mm.sort_regions()
await ctx.tick()
@mapmaker.command(name="save")
async def _mapmaker_save(self, ctx: Context, *, map_name: str):
"""Save the current map to a different map name"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if self.mm.name == map_name:
await ctx.maybe_send_embed("This map already has that name, no reason to save")
return
target_save = self.custom_map_folder / map_name
result = await self._mm_save_map(map_name, target_save)
if not result:
await ctx.maybe_send_embed("Failed to save to that name")
else:
await ctx.maybe_send_embed(f"Map successfully saved to {target_save}")
@mapmaker.command(name="upload")
async def _mapmaker_upload(self, ctx: Context, map_name: str, path_to_image=""):
"""Load a map image to be modified. Upload one with this command or provide a path"""
message: discord.Message = ctx.message
if not message.attachments and not path_to_image:
await ctx.maybe_send_embed(
"Either upload an image with this command or provide a path to the image"
)
return
target_save = self.custom_map_folder / map_name
if target_save.exists() and target_save.is_dir():
await ctx.maybe_send_embed(f"{map_name} already exists, okay to overwrite?")
pred = MessagePredicate.yes_or_no(ctx)
try:
await self.bot.wait_for("message", check=pred, timeout=30)
except TimeoutError:
await ctx.maybe_send_embed("Response timed out, cancelling save")
return
if not pred.result:
return
if self.mm: # Only one map can be worked on at a time
await ctx.maybe_send_embed(
"An existing map is in progress, close it before opening a new one. (`[p]mapmaker close`)"
)
return
async with ctx.typing():
self.mm = regioner.MapMaker(target_save)
self.mm.custom = True
if path_to_image:
path_to_image = pathlib.Path(path_to_image)
if not path_to_image.exists():
await ctx.maybe_send_embed("Map not found at that path")
return
mm_img = Image.open(path_to_image)
elif message.attachments:
attch: discord.Attachment = message.attachments[0]
# attch_file = await attch.to_file()
buffer = BytesIO()
await attch.save(buffer)
mm_img: Image.Image = Image.open(buffer)
else:
# Wait what?
return
if mm_img.mode == "P":
# Maybe convert to L to prevent RGB?
mm_img = mm_img.convert() # No P mode, convert it
result = await self.mm.init_directory(map_name, target_save, mm_img)
if not result:
self.mm = None
await ctx.maybe_send_embed("Failed to upload to that name")
return
maps_json_path = self.custom_map_folder / "maps.json"
with maps_json_path.open("r+") as maps:
map_data = json.load(maps)
map_data["maps"].append(map_name)
maps.seek(0)
json.dump(map_data, maps, sort_keys=True, indent=4)
await ctx.maybe_send_embed(f"Map successfully uploaded to {target_save}")
@mapmaker.command(name="sample")
async def _mapmaker_sample(self, ctx: Context, region: int = None):
"""Print the currently being modified map as a sample"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if region is not None and region not in self.mm.regions:
await ctx.send("This region doesn't exist or was deleted")
return
async with ctx.typing():
files = await self.mm.get_sample(region)
for f in files:
await ctx.send(file=discord.File(f, filename="map.png"))
await ctx.tick()
@mapmaker.command(name="region")
async def _mapmaker_region(self, ctx: Context, region: int, print_number: bool = False):
"""Print the currently being modified map as a sample"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if region not in self.mm.regions:
await ctx.send("This region doesn't exist or was deleted")
return
async with ctx.typing():
files = await self.mm.sample_region(region, print_number)
for f in files:
await ctx.send(file=discord.File(f, filename="map.png"))
await ctx.tick()
@mapmaker.command(name="load")
async def _mapmaker_load(self, ctx: Context, map_name: str):
"""Load an existing map to be modified."""
if self.mm:
await ctx.maybe_send_embed(
"There is a current map in progress. Close it first with `[p]mapmaker close`"
)
return
map_path = self.custom_map_folder / map_name
if not map_path.exists() or not map_path.is_dir():
await ctx.maybe_send_embed(f"Map {map_name} not found in {self.custom_map_folder}")
return
self.mm = regioner.MapMaker(map_path)
self.mm.load_data()
await ctx.tick()
@mapmaker.group(name="masks")
async def _mapmaker_masks(self, ctx: Context):
"""Base command for managing map masks"""
if ctx.invoked_subcommand is None:
pass
@_mapmaker_masks.command(name="generate")
async def _mapmaker_masks_generate(self, ctx: Context):
"""
Generate masks for the map
Currently only works on maps with black borders and white regions.
Non-white regions are ignored (i.e. blue water)
"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
masks_dir = self.mm.masks_path()
if masks_dir.exists() and masks_dir.is_dir():
await ctx.maybe_send_embed("Mask folder already exists, delete this before continuing")
return
with ctx.typing():
regions = await self.mm.generate_masks()
if not regions:
await ctx.maybe_send_embed("Failed to generate masks")
return
await ctx.maybe_send_embed(f"{len(regions)} masks generated into {masks_dir}")
@_mapmaker_masks.command(name="delete")
async def _mapmaker_masks_delete(self, ctx: Context, mask_list: Greedy[int]):
"""
Delete the listed masks from the map
"""
if not mask_list:
await ctx.send_help()
return
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
masks_dir = self.mm.masks_path()
if not masks_dir.exists() or not masks_dir.is_dir():
await ctx.maybe_send_embed("There are no masks")
return
async with ctx.typing():
result = await self.mm.delete_masks(mask_list)
if result:
await ctx.maybe_send_embed(f"Delete masks: {mask_list}")
else:
await ctx.maybe_send_embed(f"Failed to delete masks")
@_mapmaker_masks.command(name="cleanup", aliases=["reduce", "prune"])
async def _mapmaker_masks_prune(self, ctx: Context):
"""
Removes deleted masks, and reduces the ID numbers down to the lowest available.
Warning: Backup your files before running this. Any errors will break your masks.
"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
masks_dir = self.mm.masks_path()
if not masks_dir.exists() or not masks_dir.is_dir():
await ctx.maybe_send_embed("There are no masks")
return
async with ctx.typing():
result = await self.mm.prune_masks()
if result:
await ctx.maybe_send_embed(f"Pruned masks: {result}")
else:
await ctx.maybe_send_embed(f"Failed to cleanup masks")
@_mapmaker_masks.command(name="convert")
async def _mapmaker_masks_convert(self, ctx: Context, mask_list: Greedy[int]):
"""
Converts the provided mask images into 1-bit black-only masks.
Processes all masks if mask_list is empty
Warning: This may take a while.
"""
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
masks_dir = self.mm.masks_path()
if not masks_dir.exists() or not masks_dir.is_dir():
await ctx.maybe_send_embed("There are no masks")
return
async with ctx.typing():
result = await self.mm.convert_masks(mask_list)
if result:
await ctx.maybe_send_embed(f"All masks converted")
else:
await ctx.maybe_send_embed(f"Failed to convert masks")
@_mapmaker_masks.command(name="merge", aliases=["combine"])
async def _mapmaker_masks_combine(
self, ctx: Context, mask_list: Greedy[int], recommended=False
):
"""Merge masks into a single mask"""
if not mask_list and not recommended:
await ctx.send_help()
return
if not self.mm:
await ctx.maybe_send_embed("No map currently being worked on")
return
if recommended and mask_list:
await ctx.maybe_send_embed(
"Can't combine recommend masks and a mask list at the same time, pick one"
)
return
masks_dir = self.mm.masks_path()
if not masks_dir.exists() or not masks_dir.is_dir():
await ctx.maybe_send_embed("There are no masks")
return
if not recommended:
for mask in mask_list: # TODO: switch to self.mm.regions intersection of sets
m = masks_dir / f"{mask}.png"
if not m.exists():
await ctx.maybe_send_embed(f"Mask #{mask} does not exist")
return
else:
await ctx.send("Not Implemented")
return
async with ctx.typing():
result = await self.mm.combine_masks(mask_list)
if not result:
await ctx.maybe_send_embed(
"Failed to combine masks, try the command again or check log for errors"
)
return
await ctx.maybe_send_embed(f"Combined masks into mask # {result}")
@commands.group()
async def conquest(self, ctx: Context):
"""
Base command for conquest cog. Start with `[p]conquest set map` to select a map.
"""
# if ctx.invoked_subcommand is None:
# if self.current_maps[ctx.guild.id] is not None:
# await self._conquest_current(ctx)
@conquest.command(name="list")
async def _conquest_list(self, ctx: Context):
"""
List maps available for starting a Conquest game.
"""
maps_json = self.asset_path / "maps.json"
with maps_json.open() as maps:
maps_json = json.load(maps)
map_list = maps_json["maps"]
maps_json = self.custom_map_folder / "maps.json"
if maps_json.exists():
with maps_json.open() as maps:
maps_json = json.load(maps)
custom_map_list = maps_json["maps"]
map_list = "\n".join(map_list)
custom_map_list = "\n".join(custom_map_list)
await ctx.maybe_send_embed(f"Current maps:\n{map_list}\n\nCustom maps:\n{custom_map_list}")
@conquest.group(name="set")
async def conquest_set(self, ctx: Context):
"""Base command for admin actions like selecting a map"""
if ctx.invoked_subcommand is None:
pass
@conquest_set.command(name="resetzoom")
async def _conquest_set_resetzoom(self, ctx: Context):
"""Resets the zoom level of the current map"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
if not await current_game.reset_zoom():
await ctx.maybe_send_embed(f"No zoom data found, reset not needed")
await ctx.tick()
@conquest_set.command(name="zoom")
async def _conquest_set_zoom(self, ctx: Context, x: int, y: int, zoom: float):
"""
Set the zoom level and position of the current map
x: positive integer
y: positive integer
zoom: float greater than or equal to 1
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
if x < 0 or y < 0 or zoom < 1:
await ctx.send_help()
return
await current_game.set_zoom(x, y, zoom)
await ctx.tick()
@conquest_set.command(name="zoomtest")
async def _conquest_set_zoomtest(self, ctx: Context, x: int, y: int, zoom: float):
"""
Test the zoom level and position of the current map
x: positive integer
y: positive integer
zoom: float greater than or equal to 1
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
if x < 0 or y < 0 or zoom < 1:
await ctx.send_help()
return
# UNUSED: out_of_date marks zoom as oudated, since this overwrite the temp
zoomed_path = await current_game.create_zoomed_map(
x, y, zoom, current_game.current_map, current_game.zoomed_current_map
)
await ctx.send(file=discord.File(fp=zoomed_path, filename=f"test_zoom.{self.ext}"))
@conquest_set.command(name="save")
async def _conquest_set_save(self, ctx: Context, *, save_name):
"""Save the current map to be loaded later"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
await current_game.save_as(save_name)
await ctx.tick()
@conquest_set.command(name="load")
async def _conquest_set_load(self, ctx: Context, *, save_name):
"""Load a saved map to be the current map"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
if not await current_game.load_from(save_name):
await ctx.maybe_send_embed(f"Saved map not found, check your spelling")
return
await ctx.tick()
@conquest_set.command(name="map")
async def _conquest_set_map(
self, ctx: Context, mapname: str, reset: bool = False, is_custom: bool = False
):
"""
Select a map from current available maps
To add more maps, see the guide (WIP)
"""
check_path = self._path_if_custom(is_custom)
map_dir = check_path / mapname
if not map_dir.exists() or not map_dir.is_dir():
await ctx.maybe_send_embed(
f"Map `{mapname}` was not found in the {check_path} directory"
)
return
guild_folder = self.current_map_folder / str(ctx.guild.id)
if not guild_folder.exists():
guild_folder.mkdir()
self.current_games[ctx.guild.id] = ConquestGame(map_dir, mapname, guild_folder / mapname)
# self.current_map = mapname
# self.is_custom = is_custom
# await self.config.current_map.set(self.current_map) # Save to config too
# await self.config.is_custom.set(is_custom)
#
# await self.current_map_load()
await self.current_games[ctx.guild.id].resume_game(ctx, reset)
new_game = self.default_games.copy()
new_game["map_name"] = mapname
new_game["is_custom"] = is_custom
await self.config.guild(ctx.guild).current_game.set(new_game)
# current_map_folder = await self._get_current_map_folder()
# current_map = current_map_folder / f"current.{self.ext}"
#
# if not reset and current_map.exists():
# await ctx.maybe_send_embed(
# "This map is already in progress, resuming from last game\n"
# "Use `[p]conquest set map [mapname] True` to start a new game"
# )
# else:
# if not current_map_folder.exists():
# current_map_folder.mkdir()
# copyfile(check_path / mapname / f"blank.{self.ext}", current_map)
await ctx.tick()
@conquest.command(name="current")
async def _conquest_current(self, ctx: Context):
"""
Send the current map.
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
async with ctx.typing():
map_file = await current_game.get_maybe_zoomed_map("current")
await ctx.send(file=map_file)
@conquest.command("blank")
async def _conquest_blank(self, ctx: Context):
"""
Print the blank version of the current map, for reference.
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
async with ctx.typing():
map_file = await current_game.get_maybe_zoomed_map("blank")
await ctx.send(file=map_file)
@conquest.command("numbered")
async def _conquest_numbered(self, ctx: Context):
"""
Print the numbered version of the current map, for reference.
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
async with ctx.typing():
map_file = await current_game.get_maybe_zoomed_map("numbered")
await ctx.send(file=map_file)
@conquest.command(name="multitake")
async def _conquest_multitake(
self, ctx: Context, start_region: int, end_region: int, color: str
):
"""
Claim all the territories between the two provided region numbers (inclusive)
:param start_region:
"""
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
try:
color = ImageColor.getrgb(color)
except ValueError:
await ctx.maybe_send_embed(f"Invalid color {color}")
return
if start_region > end_region:
start_region, end_region = end_region, start_region
if end_region > current_game.region_max or start_region < 1:
await ctx.maybe_send_embed(
f"Max region number is {current_game.region_max}, minimum is 1"
)
return
regions = [r for r in range(start_region, end_region + 1)]
async with ctx.typing():
await current_game._process_take_regions(color, regions)
map_file = await current_game.get_maybe_zoomed_map("current")
await ctx.send(file=map_file)
@conquest.command(name="take")
async def _conquest_take(self, ctx: Context, regions: Greedy[int], *, color: str):
"""
Claim a territory or list of territories for a specified color
:param regions: List of integer regions
:param color: Color to claim regions
"""
if not regions:
await ctx.send_help()
return
current_game = self.current_games[ctx.guild.id]
if current_game is None:
await ctx.maybe_send_embed(ERROR_CONQUEST_SET_MAP)
return
try:
color = ImageColor.getrgb(color)
except ValueError:
await ctx.maybe_send_embed(f"Invalid color {color}")
return
for region in regions:
if region > current_game.region_max or region < 1:
await ctx.maybe_send_embed(
f"Max region number is {current_game.region_max}, minimum is 1"
)
return
async with ctx.typing():
await current_game._process_take_regions(color, regions)
map_file = await current_game.get_maybe_zoomed_map("current")
await ctx.send(file=map_file)