- Players and AI can sacrifice their own units and upload them to the multiworld. - Players and AI can summon random units from the multiworld. - Has 4 new separate options for how many sacrifices and summons either the player or the AI can make per level attempt. - New /sacrifice_summon command to toggle sacrifices and summons on/off. Useful if the AI makes a level impossible with their summons. - Linux Support. - Is an apworld now. --------- Co-authored-by: Raspberry Floof <raspberry@rosenthalcastle.org> Co-authored-by: KScl <ks@rosenthalcastle.org> Co-authored-by: Abigail Fox <Raspberryfloof@users.noreply.github.com> Co-authored-by: qwint <qwint.42@gmail.com> Co-authored-by: Fabian Dill <Berserker66@users.noreply.github.com>
		
			
				
	
	
		
			595 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			595 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import annotations
 | 
						|
 | 
						|
import atexit
 | 
						|
import os
 | 
						|
import pkgutil
 | 
						|
import sys
 | 
						|
import asyncio
 | 
						|
import random
 | 
						|
import typing
 | 
						|
from typing import Tuple, List, Iterable, Dict
 | 
						|
 | 
						|
from . import WargrooveWorld
 | 
						|
from .Items import item_table, faction_table, CommanderData, ItemData
 | 
						|
 | 
						|
import ModuleUpdate
 | 
						|
ModuleUpdate.update()
 | 
						|
 | 
						|
import Utils
 | 
						|
import json
 | 
						|
import logging
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    Utils.init_logging("WargrooveClient", exception_logger="Client")
 | 
						|
 | 
						|
from NetUtils import ClientStatus
 | 
						|
from CommonClient import gui_enabled, logger, get_base_parser, ClientCommandProcessor, \
 | 
						|
    CommonContext, server_loop
 | 
						|
 | 
						|
wg_logger = logging.getLogger("WG")
 | 
						|
 | 
						|
 | 
						|
class WargrooveClientCommandProcessor(ClientCommandProcessor):
 | 
						|
    def _cmd_sacrifice_summon(self):
 | 
						|
        """Toggles sacrifices and summons On/Off"""
 | 
						|
        if isinstance(self.ctx, WargrooveContext):
 | 
						|
            self.ctx.has_sacrifice_summon = not self.ctx.has_sacrifice_summon
 | 
						|
            if self.ctx.has_sacrifice_summon:
 | 
						|
                self.output(f"Sacrifices and summons are enabled.")
 | 
						|
            else:
 | 
						|
                unit_summon_response_file = os.path.join(self.ctx.game_communication_path, "unitSummonResponse")
 | 
						|
                if os.path.exists(unit_summon_response_file):
 | 
						|
                    os.remove(unit_summon_response_file)
 | 
						|
                self.output(f"Sacrifices and summons are disabled.")
 | 
						|
 | 
						|
    def _cmd_deathlink(self):
 | 
						|
        """Toggles deathlink On/Off"""
 | 
						|
        if isinstance(self.ctx, WargrooveContext):
 | 
						|
            self.ctx.has_death_link = not self.ctx.has_death_link
 | 
						|
            Utils.async_start(self.ctx.update_death_link(self.ctx.has_death_link), name="Update Deathlink")
 | 
						|
            if self.ctx.has_death_link:
 | 
						|
                death_link_send_file = os.path.join(self.ctx.game_communication_path, "deathLinkSend")
 | 
						|
                if os.path.exists(death_link_send_file):
 | 
						|
                    os.remove(death_link_send_file)
 | 
						|
                self.output(f"Deathlink enabled.")
 | 
						|
            else:
 | 
						|
                death_link_receive_file = os.path.join(self.ctx.game_communication_path, "deathLinkReceive")
 | 
						|
                if os.path.exists(death_link_receive_file):
 | 
						|
                    os.remove(death_link_receive_file)
 | 
						|
                self.output(f"Deathlink disabled.")
 | 
						|
 | 
						|
    def _cmd_resync(self):
 | 
						|
        """Manually trigger a resync."""
 | 
						|
        self.output(f"Syncing items.")
 | 
						|
        self.ctx.syncing = True
 | 
						|
 | 
						|
    def _cmd_commander(self, *commander_name: Iterable[str]):
 | 
						|
        """Set the current commander to the given commander."""
 | 
						|
        if commander_name:
 | 
						|
            self.ctx.set_commander(' '.join(commander_name))
 | 
						|
        else:
 | 
						|
            if self.ctx.can_choose_commander:
 | 
						|
                commanders = self.ctx.get_commanders()
 | 
						|
                wg_logger.info('Unlocked commanders: ' +
 | 
						|
                               ', '.join((commander.name for commander, unlocked in commanders if unlocked)))
 | 
						|
                wg_logger.info('Locked commanders: ' +
 | 
						|
                               ', '.join((commander.name for commander, unlocked in commanders if not unlocked)))
 | 
						|
            else:
 | 
						|
                wg_logger.error('Cannot set commanders in this game mode.')
 | 
						|
 | 
						|
 | 
						|
class WargrooveContext(CommonContext):
 | 
						|
    command_processor: int = WargrooveClientCommandProcessor
 | 
						|
    game = "Wargroove"
 | 
						|
    items_handling = 0b111  # full remote
 | 
						|
    current_commander: CommanderData = faction_table["Starter"][0]
 | 
						|
    can_choose_commander: bool = False
 | 
						|
    commander_defense_boost_multiplier: int = 0
 | 
						|
    income_boost_multiplier: int = 0
 | 
						|
    starting_groove_multiplier: float
 | 
						|
    has_death_link: bool = False
 | 
						|
    has_sacrifice_summon: bool = True
 | 
						|
    player_stored_units_key: str = ""
 | 
						|
    ai_stored_units_key: str = ""
 | 
						|
    max_stored_units: int = 1000
 | 
						|
    faction_item_ids = {
 | 
						|
        'Starter': 0,
 | 
						|
        'Cherrystone': 52025,
 | 
						|
        'Felheim': 52026,
 | 
						|
        'Floran': 52027,
 | 
						|
        'Heavensong': 52028,
 | 
						|
        'Requiem': 52029,
 | 
						|
        'Outlaw': 52030
 | 
						|
    }
 | 
						|
    buff_item_ids = {
 | 
						|
        'Income Boost': 52023,
 | 
						|
        'Commander Defense Boost': 52024,
 | 
						|
    }
 | 
						|
    unit_classes = {
 | 
						|
        "archer",
 | 
						|
        "ballista",
 | 
						|
        "balloon",
 | 
						|
        "dog",
 | 
						|
        "dragon",
 | 
						|
        "giant",
 | 
						|
        "harpoonship",
 | 
						|
        "harpy",
 | 
						|
        "knight",
 | 
						|
        "mage",
 | 
						|
        "merman",
 | 
						|
        "rifleman",
 | 
						|
        "soldier",
 | 
						|
        "spearman",
 | 
						|
        "thief",
 | 
						|
        "thief_with_gold",
 | 
						|
        "travelboat",
 | 
						|
        "trebuchet",
 | 
						|
        "turtle",
 | 
						|
        "villager",
 | 
						|
        "wagon",
 | 
						|
        "warship",
 | 
						|
        "witch",
 | 
						|
    }
 | 
						|
 | 
						|
    def __init__(self, server_address, password):
 | 
						|
        super(WargrooveContext, self).__init__(server_address, password)
 | 
						|
        self.send_index: int = 0
 | 
						|
        self.syncing = False
 | 
						|
        self.awaiting_bridge = False
 | 
						|
        # self.game_communication_path: files go in this path to pass data between us and the actual game
 | 
						|
        game_options = WargrooveWorld.settings
 | 
						|
 | 
						|
        # Validate the AppData directory with Wargroove save data.
 | 
						|
        # By default, Windows sets an environment variable we can leverage.
 | 
						|
        # However, other OSes don't usually have this value set, so we need to rely on a settings value instead.
 | 
						|
        appdata_wargroove = None
 | 
						|
        if "appdata" in os.environ:
 | 
						|
            appdata_wargroove = os.environ['appdata']
 | 
						|
        else:
 | 
						|
            try:
 | 
						|
                appdata_wargroove = game_options.save_directory
 | 
						|
            except FileNotFoundError:
 | 
						|
                print_error_and_close("WargrooveClient couldn't detect a path to the AppData folder.\n"
 | 
						|
                                      "Unable to infer required game_communication_path.\n"
 | 
						|
                                      "Try setting the \"save_directory\" value in your local options file "
 | 
						|
                                      "to the AppData folder containing your Wargroove saves.")
 | 
						|
        appdata_wargroove = os.path.expandvars(os.path.join(appdata_wargroove, "Chucklefish", "Wargroove"))
 | 
						|
        if not os.path.isdir(appdata_wargroove):
 | 
						|
            print_error_and_close(f"WargrooveClient couldn't find Wargroove data in your AppData folder.\n"
 | 
						|
                                  f"Looked in \"{appdata_wargroove}\".\n"
 | 
						|
                                  f"If you haven't yet booted the game at least once, boot Wargroove "
 | 
						|
                                  f"and then close it to attempt to fix this error.\n"
 | 
						|
                                  f"If the AppData folder above seems wrong, try setting the "
 | 
						|
                                  f"\"save_directory\" value in your local options file "
 | 
						|
                                  f"to the AppData folder containing your Wargroove saves.")
 | 
						|
 | 
						|
        # Check for the Wargroove game executable path.
 | 
						|
        # This should always be set regardless of the OS.
 | 
						|
        root_directory = game_options["root_directory"]
 | 
						|
        if not os.path.isfile(os.path.join(root_directory, "win64_bin", "wargroove64.exe")):
 | 
						|
            print_error_and_close(f"WargrooveClient couldn't find wargroove64.exe in "
 | 
						|
                                  f"\"{root_directory}/win64_bin/\".\n"
 | 
						|
                                  f"Unable to infer required game_communication_path.\n"
 | 
						|
                                  f"Please verify the \"root_directory\" value in your local "
 | 
						|
                                  f"options file is set correctly.")
 | 
						|
        self.game_communication_path = os.path.join(root_directory, "AP")
 | 
						|
        if not os.path.exists(self.game_communication_path):
 | 
						|
            os.makedirs(self.game_communication_path)
 | 
						|
        self.remove_communication_files()
 | 
						|
        atexit.register(self.remove_communication_files)
 | 
						|
        if not os.path.isdir(appdata_wargroove):
 | 
						|
            print_error_and_close("WargrooveClient couldn't find Wargoove in appdata!"
 | 
						|
                                  "Boot Wargroove and then close it to attempt to fix this error")
 | 
						|
        mods_directory = os.path.join(appdata_wargroove, "mods", "ArchipelagoMod")
 | 
						|
        save_directory = os.path.join(appdata_wargroove, "save")
 | 
						|
 | 
						|
        # Wargroove doesn't always create the mods directory, so we have to do it
 | 
						|
        if not os.path.isdir(mods_directory):
 | 
						|
            os.makedirs(mods_directory)
 | 
						|
        resources = ["data/mods/ArchipelagoMod/maps.dat",
 | 
						|
                     "data/mods/ArchipelagoMod/mod.dat",
 | 
						|
                     "data/mods/ArchipelagoMod/modAssets.dat",
 | 
						|
                     "data/save/campaign-c40a6e5b0cdf86ddac03b276691c483d.cmp",
 | 
						|
                     "data/save/campaign-c40a6e5b0cdf86ddac03b276691c483d.cmp.bak"]
 | 
						|
        file_paths = [os.path.join(mods_directory, "maps.dat"),
 | 
						|
                      os.path.join(mods_directory, "mod.dat"),
 | 
						|
                      os.path.join(mods_directory, "modAssets.dat"),
 | 
						|
                      os.path.join(save_directory, "campaign-c40a6e5b0cdf86ddac03b276691c483d.cmp"),
 | 
						|
                      os.path.join(save_directory, "campaign-c40a6e5b0cdf86ddac03b276691c483d.cmp.bak")]
 | 
						|
        for resource, destination in zip(resources, file_paths):
 | 
						|
            file_data = pkgutil.get_data("worlds.wargroove", resource)
 | 
						|
            if file_data is None:
 | 
						|
                print_error_and_close("WargrooveClient couldn't find Wargoove mod and save files in install!")
 | 
						|
            with open(destination, 'wb') as f:
 | 
						|
                f.write(file_data)
 | 
						|
 | 
						|
    def on_deathlink(self, data: typing.Dict[str, typing.Any]) -> None:
 | 
						|
        with open(os.path.join(self.game_communication_path, "deathLinkReceive"), 'w+') as f:
 | 
						|
            text = data.get("cause", "")
 | 
						|
            if text:
 | 
						|
                f.write(f"DeathLink: {text}")
 | 
						|
            else:
 | 
						|
                f.write(f"DeathLink: Received from {data['source']}")
 | 
						|
        super(WargrooveContext, self).on_deathlink(data)
 | 
						|
 | 
						|
    async def server_auth(self, password_requested: bool = False):
 | 
						|
        if password_requested and not self.password:
 | 
						|
            await super(WargrooveContext, self).server_auth(password_requested)
 | 
						|
        await self.get_username()
 | 
						|
        await self.send_connect()
 | 
						|
 | 
						|
    async def connection_closed(self):
 | 
						|
        await super(WargrooveContext, self).connection_closed()
 | 
						|
        self.remove_communication_files()
 | 
						|
        self.checked_locations.clear()
 | 
						|
        self.server_locations.clear()
 | 
						|
        self.finished_game = False
 | 
						|
 | 
						|
    @property
 | 
						|
    def endpoints(self):
 | 
						|
        if self.server:
 | 
						|
            return [self.server]
 | 
						|
        else:
 | 
						|
            return []
 | 
						|
 | 
						|
    async def shutdown(self):
 | 
						|
        await super(WargrooveContext, self).shutdown()
 | 
						|
        self.remove_communication_files()
 | 
						|
        self.checked_locations.clear()
 | 
						|
        self.server_locations.clear()
 | 
						|
        self.finished_game = False
 | 
						|
 | 
						|
    def remove_communication_files(self):
 | 
						|
        for root, dirs, files in os.walk(self.game_communication_path):
 | 
						|
            for file in files:
 | 
						|
                os.remove(root + "/" + file)
 | 
						|
 | 
						|
    def on_package(self, cmd: str, args: dict):
 | 
						|
        if cmd in {"Connected"}:
 | 
						|
            slot_data = args["slot_data"]
 | 
						|
            self.has_death_link = slot_data.get("death_link", False)
 | 
						|
            filename = f"AP_settings.json"
 | 
						|
            with open(os.path.join(self.game_communication_path, filename), 'w') as f:
 | 
						|
                json.dump(slot_data, f)
 | 
						|
                self.can_choose_commander = slot_data["can_choose_commander"]
 | 
						|
                print('can choose commander:', self.can_choose_commander)
 | 
						|
                self.starting_groove_multiplier = slot_data["starting_groove_multiplier"]
 | 
						|
                self.income_boost_multiplier = slot_data["income_boost"]
 | 
						|
                self.commander_defense_boost_multiplier = slot_data["commander_defense_boost"]
 | 
						|
            for ss in self.checked_locations:
 | 
						|
                filename = f"send{ss}"
 | 
						|
                with open(os.path.join(self.game_communication_path, filename), 'w') as f:
 | 
						|
                    pass
 | 
						|
 | 
						|
            self.player_stored_units_key = f"wargroove_player_units_{self.team}"
 | 
						|
            self.ai_stored_units_key = f"wargroove_ai_units_{self.team}"
 | 
						|
            self.set_notify(self.player_stored_units_key, self.ai_stored_units_key)
 | 
						|
 | 
						|
            self.update_commander_data()
 | 
						|
            self.ui.update_tracker()
 | 
						|
 | 
						|
            random.seed(self.seed_name + str(self.slot))
 | 
						|
            # Our indexes start at 1 and we have 24 levels
 | 
						|
            for i in range(1, 25):
 | 
						|
                filename = f"seed{i}"
 | 
						|
                with open(os.path.join(self.game_communication_path, filename), 'w') as f:
 | 
						|
                    f.write(str(random.randint(0, 4294967295)))
 | 
						|
 | 
						|
        if cmd in {"RoomInfo"}:
 | 
						|
            self.seed_name = args["seed_name"]
 | 
						|
 | 
						|
        if cmd in {"ReceivedItems"}:
 | 
						|
            received_ids = [item.item for item in self.items_received]
 | 
						|
            for network_item in self.items_received:
 | 
						|
                filename = f"AP_{str(network_item.item)}.item"
 | 
						|
                path = os.path.join(self.game_communication_path, filename)
 | 
						|
 | 
						|
                # Newly-obtained items
 | 
						|
                if not os.path.isfile(path):
 | 
						|
                    open(path, 'w').close()
 | 
						|
                    # Announcing commander unlocks
 | 
						|
                    item_name = self.item_names.lookup_in_game(network_item.item)
 | 
						|
                    if item_name in faction_table.keys():
 | 
						|
                        for commander in faction_table[item_name]:
 | 
						|
                            logger.info(f"{commander.name} has been unlocked!")
 | 
						|
 | 
						|
                with open(path, 'w') as f:
 | 
						|
                    item_count = received_ids.count(network_item.item)
 | 
						|
                    if self.buff_item_ids["Income Boost"] == network_item.item:
 | 
						|
                        f.write(f"{item_count * self.income_boost_multiplier}")
 | 
						|
                    elif self.buff_item_ids["Commander Defense Boost"] == network_item.item:
 | 
						|
                        f.write(f"{item_count * self.commander_defense_boost_multiplier}")
 | 
						|
                    else:
 | 
						|
                        f.write(f"{item_count}")
 | 
						|
 | 
						|
                print_filename = f"AP_{str(network_item.item)}.item.print"
 | 
						|
                print_path = os.path.join(self.game_communication_path, print_filename)
 | 
						|
                if not os.path.isfile(print_path):
 | 
						|
                    open(print_path, 'w').close()
 | 
						|
                    with open(print_path, 'w') as f:
 | 
						|
                        f.write("Received " +
 | 
						|
                                self.item_names.lookup_in_game(network_item.item) +
 | 
						|
                                " from " +
 | 
						|
                                self.player_names[network_item.player])
 | 
						|
            self.update_commander_data()
 | 
						|
            self.ui.update_tracker()
 | 
						|
 | 
						|
        if cmd in {"RoomUpdate"}:
 | 
						|
            if "checked_locations" in args:
 | 
						|
                for ss in self.checked_locations:
 | 
						|
                    filename = f"send{ss}"
 | 
						|
                    with open(os.path.join(self.game_communication_path, filename), 'w') as f:
 | 
						|
                        pass
 | 
						|
 | 
						|
    def run_gui(self):
 | 
						|
        """Import kivy UI system and start running it as self.ui_task."""
 | 
						|
        from kvui import GameManager, HoverBehavior, ServerToolTip
 | 
						|
        from kivymd.uix.tab import MDTabsItem, MDTabsItemText
 | 
						|
        from kivy.lang import Builder
 | 
						|
        from kivy.uix.togglebutton import ToggleButton
 | 
						|
        from kivy.uix.boxlayout import BoxLayout
 | 
						|
        from kivy.uix.label import Label
 | 
						|
        import pkgutil
 | 
						|
 | 
						|
        class TrackerLayout(BoxLayout):
 | 
						|
            pass
 | 
						|
 | 
						|
        class CommanderSelect(BoxLayout):
 | 
						|
            pass
 | 
						|
 | 
						|
        class CommanderButton(ToggleButton):
 | 
						|
            pass
 | 
						|
 | 
						|
        class FactionBox(BoxLayout):
 | 
						|
            pass
 | 
						|
 | 
						|
        class CommanderGroup(BoxLayout):
 | 
						|
            pass
 | 
						|
 | 
						|
        class ItemTracker(BoxLayout):
 | 
						|
            pass
 | 
						|
 | 
						|
        class ItemLabel(Label):
 | 
						|
            pass
 | 
						|
 | 
						|
        class WargrooveManager(GameManager):
 | 
						|
            logging_pairs = [
 | 
						|
                ("Client", "Archipelago"),
 | 
						|
                ("WG", "WG Console"),
 | 
						|
            ]
 | 
						|
            base_title = "Archipelago Wargroove Client"
 | 
						|
            ctx: WargrooveContext
 | 
						|
            unit_tracker: ItemTracker
 | 
						|
            trigger_tracker: BoxLayout
 | 
						|
            boost_tracker: BoxLayout
 | 
						|
            commander_buttons: Dict[int, List[CommanderButton]]
 | 
						|
            tracker_items = {
 | 
						|
                "Swordsman": ItemData(None, "Unit", False),
 | 
						|
                "Dog": ItemData(None, "Unit", False),
 | 
						|
                **item_table
 | 
						|
            }
 | 
						|
 | 
						|
            def build(self):
 | 
						|
                container = super().build()
 | 
						|
                self.add_client_tab("Wargroove", self.build_tracker())
 | 
						|
                return container
 | 
						|
 | 
						|
            def build_tracker(self) -> TrackerLayout:
 | 
						|
                try:
 | 
						|
                    tracker = TrackerLayout(orientation="horizontal")
 | 
						|
                    commander_select = CommanderSelect(orientation="vertical")
 | 
						|
                    self.commander_buttons = {}
 | 
						|
 | 
						|
                    for faction, commanders in faction_table.items():
 | 
						|
                        faction_box = FactionBox(size_hint=(None, None), width=100 * len(commanders), height=70)
 | 
						|
                        commander_group = CommanderGroup()
 | 
						|
                        commander_buttons = []
 | 
						|
                        for commander in commanders:
 | 
						|
                            commander_button = CommanderButton(text=commander.name, group="commanders")
 | 
						|
                            if faction == "Starter":
 | 
						|
                                commander_button.disabled = False
 | 
						|
                            commander_button.bind(on_press=lambda instance: self.ctx.set_commander(instance.text))
 | 
						|
                            commander_buttons.append(commander_button)
 | 
						|
                            commander_group.add_widget(commander_button)
 | 
						|
                        self.commander_buttons[faction] = commander_buttons
 | 
						|
                        faction_box.add_widget(Label(text=faction, size_hint_x=None, pos_hint={'left': 1}, size_hint_y=None, height=10))
 | 
						|
                        faction_box.add_widget(commander_group)
 | 
						|
                        commander_select.add_widget(faction_box)
 | 
						|
                    item_tracker = ItemTracker(padding=[0,20])
 | 
						|
                    self.unit_tracker = BoxLayout(orientation="vertical")
 | 
						|
                    other_tracker = BoxLayout(orientation="vertical")
 | 
						|
                    self.trigger_tracker = BoxLayout(orientation="vertical")
 | 
						|
                    self.boost_tracker = BoxLayout(orientation="vertical")
 | 
						|
                    other_tracker.add_widget(self.trigger_tracker)
 | 
						|
                    other_tracker.add_widget(self.boost_tracker)
 | 
						|
                    item_tracker.add_widget(self.unit_tracker)
 | 
						|
                    item_tracker.add_widget(other_tracker)
 | 
						|
                    tracker.add_widget(commander_select)
 | 
						|
                    tracker.add_widget(item_tracker)
 | 
						|
                    self.update_tracker()
 | 
						|
                    return tracker
 | 
						|
                except Exception as e:
 | 
						|
                    print(e)
 | 
						|
 | 
						|
            def update_tracker(self):
 | 
						|
                received_ids = [item.item for item in self.ctx.items_received]
 | 
						|
                for faction, item_id in self.ctx.faction_item_ids.items():
 | 
						|
                    for commander_button in self.commander_buttons[faction]:
 | 
						|
                        commander_button.disabled = not (faction == "Starter" or item_id in received_ids)
 | 
						|
                self.unit_tracker.clear_widgets()
 | 
						|
                self.trigger_tracker.clear_widgets()
 | 
						|
                for name, item in self.tracker_items.items():
 | 
						|
                    if item.type in ("Unit", "Trigger"):
 | 
						|
                        status_color = (1, 1, 1, 1) if item.code is None or item.code in received_ids else (0.6, 0.2, 0.2, 1)
 | 
						|
                        label = ItemLabel(text=name, color=status_color)
 | 
						|
                        if item.type == "Unit":
 | 
						|
                            self.unit_tracker.add_widget(label)
 | 
						|
                        else:
 | 
						|
                            self.trigger_tracker.add_widget(label)
 | 
						|
                self.boost_tracker.clear_widgets()
 | 
						|
                extra_income = received_ids.count(52023) * self.ctx.income_boost_multiplier
 | 
						|
                extra_defense = received_ids.count(52024) * self.ctx.commander_defense_boost_multiplier
 | 
						|
                income_boost = ItemLabel(text="Extra Income: " + str(extra_income))
 | 
						|
                defense_boost = ItemLabel(text="Comm Defense: " + str(100 + extra_defense))
 | 
						|
                self.boost_tracker.add_widget(income_boost)
 | 
						|
                self.boost_tracker.add_widget(defense_boost)
 | 
						|
 | 
						|
        self.ui = WargrooveManager(self)
 | 
						|
        data = pkgutil.get_data(WargrooveWorld.__module__, "Wargroove.kv").decode()
 | 
						|
        Builder.load_string(data)
 | 
						|
        self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
 | 
						|
 | 
						|
    def update_commander_data(self):
 | 
						|
        if self.can_choose_commander:
 | 
						|
            faction_items = 0
 | 
						|
            faction_item_names = [faction + ' Commanders' for faction in faction_table.keys()]
 | 
						|
            for network_item in self.items_received:
 | 
						|
                if self.item_names.lookup_in_game(network_item.item) in faction_item_names:
 | 
						|
                    faction_items += 1
 | 
						|
            starting_groove = (faction_items - 1) * self.starting_groove_multiplier
 | 
						|
            # Must be an integer larger than 0
 | 
						|
            starting_groove = int(max(starting_groove, 0))
 | 
						|
            data = {
 | 
						|
                "commander": self.current_commander.internal_name,
 | 
						|
                "starting_groove": starting_groove
 | 
						|
            }
 | 
						|
        else:
 | 
						|
            data = {
 | 
						|
                "commander": "seed",
 | 
						|
                "starting_groove": 0
 | 
						|
            }
 | 
						|
        filename = 'commander.json'
 | 
						|
        with open(os.path.join(self.game_communication_path, filename), 'w') as f:
 | 
						|
            json.dump(data, f)
 | 
						|
        if self.ui:
 | 
						|
            self.ui.update_tracker()
 | 
						|
 | 
						|
    def set_commander(self, commander_name: str) -> bool:
 | 
						|
        """Sets the current commander to the given one, if possible"""
 | 
						|
        if not self.can_choose_commander:
 | 
						|
            wg_logger.error("Cannot set commanders in this game mode.")
 | 
						|
            return
 | 
						|
        match_name = commander_name.lower()
 | 
						|
        for commander, unlocked in self.get_commanders():
 | 
						|
            if commander.name.lower() == match_name or commander.alt_name and commander.alt_name.lower() == match_name:
 | 
						|
                if unlocked:
 | 
						|
                    self.current_commander = commander
 | 
						|
                    self.syncing = True
 | 
						|
                    wg_logger.info(f"Commander set to {commander.name}.")
 | 
						|
                    self.update_commander_data()
 | 
						|
                    return True
 | 
						|
                else:
 | 
						|
                    wg_logger.error(f"Commander {commander.name} has not been unlocked.")
 | 
						|
                    return False
 | 
						|
        else:
 | 
						|
            wg_logger.error(f"{commander_name} is not a recognized Wargroove commander.")
 | 
						|
 | 
						|
    def get_commanders(self) -> List[Tuple[CommanderData, bool]]:
 | 
						|
        """Gets a list of commanders with their unlocked status"""
 | 
						|
        commanders = []
 | 
						|
        received_ids = [item.item for item in self.items_received]
 | 
						|
        for faction in faction_table.keys():
 | 
						|
            unlocked = faction == 'Starter' or self.faction_item_ids[faction] in received_ids
 | 
						|
            commanders += [(commander, unlocked) for commander in faction_table[faction]]
 | 
						|
        return commanders
 | 
						|
 | 
						|
 | 
						|
async def game_watcher(ctx: WargrooveContext):
 | 
						|
    while not ctx.exit_event.is_set():
 | 
						|
        if ctx.syncing == True:
 | 
						|
            sync_msg = [{'cmd': 'Sync'}]
 | 
						|
            if ctx.locations_checked:
 | 
						|
                sync_msg.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)})
 | 
						|
            await ctx.send_msgs(sync_msg)
 | 
						|
            ctx.syncing = False
 | 
						|
        sending = []
 | 
						|
        victory = False
 | 
						|
        for root, dirs, files in os.walk(ctx.game_communication_path):
 | 
						|
            for file in files:
 | 
						|
                if file == "deathLinkSend" and ctx.has_death_link:
 | 
						|
                    with open(os.path.join(ctx.game_communication_path, file), 'r') as f:
 | 
						|
                        failed_mission = f.read()
 | 
						|
                        if ctx.slot is not None:
 | 
						|
                            await ctx.send_death(f"{ctx.player_names[ctx.slot]} failed {failed_mission}")
 | 
						|
                    os.remove(os.path.join(ctx.game_communication_path, file))
 | 
						|
                if file.find("send") > -1:
 | 
						|
                    st = file.split("send", -1)[1]
 | 
						|
                    sending = sending+[(int(st))]
 | 
						|
                    os.remove(os.path.join(ctx.game_communication_path, file))
 | 
						|
                if file.find("victory") > -1:
 | 
						|
                    victory = True
 | 
						|
                    os.remove(os.path.join(ctx.game_communication_path, file))
 | 
						|
                if file == "unitSacrifice" or file == "unitSacrificeAI":
 | 
						|
                    if ctx.has_sacrifice_summon:
 | 
						|
                        stored_units_key = ctx.player_stored_units_key
 | 
						|
                        if file == "unitSacrificeAI":
 | 
						|
                            stored_units_key = ctx.ai_stored_units_key
 | 
						|
                        with open(os.path.join(ctx.game_communication_path, file), 'r') as f:
 | 
						|
                            unit_class = f.read()
 | 
						|
                            message = [{"cmd": 'Set', "key": stored_units_key,
 | 
						|
                                        "default": [],
 | 
						|
                                        "want_reply": True,
 | 
						|
                                        "operations": [{"operation": "add", "value": [unit_class[:64]]}]}]
 | 
						|
                            await ctx.send_msgs(message)
 | 
						|
                    os.remove(os.path.join(ctx.game_communication_path, file))
 | 
						|
                if file == "unitSummonRequestAI" or file == "unitSummonRequest":
 | 
						|
                    if ctx.has_sacrifice_summon:
 | 
						|
                        stored_units_key = ctx.player_stored_units_key
 | 
						|
                        if file == "unitSummonRequestAI":
 | 
						|
                            stored_units_key = ctx.ai_stored_units_key
 | 
						|
                        with open(os.path.join(ctx.game_communication_path, "unitSummonResponse"), 'w') as f:
 | 
						|
                            if stored_units_key in ctx.stored_data:
 | 
						|
                                stored_units = ctx.stored_data[stored_units_key]
 | 
						|
                                if stored_units is None:
 | 
						|
                                    stored_units = []
 | 
						|
                                wg1_stored_units = [unit for unit in stored_units if unit in ctx.unit_classes]
 | 
						|
                                if len(wg1_stored_units) != 0:
 | 
						|
                                    summoned_unit = random.choice(wg1_stored_units)
 | 
						|
                                    message = [{"cmd": 'Set', "key": stored_units_key,
 | 
						|
                                                "default": [],
 | 
						|
                                                "want_reply": True,
 | 
						|
                                                "operations": [{"operation": "remove", "value": summoned_unit[:64]}]}]
 | 
						|
                                    await ctx.send_msgs(message)
 | 
						|
                                    f.write(summoned_unit)
 | 
						|
                    os.remove(os.path.join(ctx.game_communication_path, file))
 | 
						|
 | 
						|
        ctx.locations_checked = sending
 | 
						|
        message = [{"cmd": 'LocationChecks', "locations": sending}]
 | 
						|
        await ctx.send_msgs(message)
 | 
						|
        if not ctx.finished_game and victory:
 | 
						|
            await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
 | 
						|
            ctx.finished_game = True
 | 
						|
        await asyncio.sleep(0.1)
 | 
						|
 | 
						|
 | 
						|
def print_error_and_close(msg):
 | 
						|
    logger.error("Error: " + msg)
 | 
						|
    Utils.messagebox("Error", msg, error=True)
 | 
						|
    sys.exit(1)
 | 
						|
 | 
						|
def launch(*launch_args: str):
 | 
						|
    async def main():
 | 
						|
        args = parser.parse_args(launch_args)
 | 
						|
        ctx = WargrooveContext(args.connect, args.password)
 | 
						|
        ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
 | 
						|
        if gui_enabled:
 | 
						|
            ctx.run_gui()
 | 
						|
        ctx.run_cli()
 | 
						|
        progression_watcher = asyncio.create_task(
 | 
						|
            game_watcher(ctx), name="WargrooveProgressionWatcher")
 | 
						|
 | 
						|
        await ctx.exit_event.wait()
 | 
						|
        ctx.server_address = None
 | 
						|
 | 
						|
        await progression_watcher
 | 
						|
 | 
						|
        await ctx.shutdown()
 | 
						|
 | 
						|
    import colorama
 | 
						|
 | 
						|
    parser = get_base_parser(description="Wargroove Client, for text interfacing.")
 | 
						|
 | 
						|
    colorama.just_fix_windows_console()
 | 
						|
    asyncio.run(main())
 | 
						|
    colorama.deinit()
 |