| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  | from __future__ import annotations | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  | import atexit | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  | import os | 
					
						
							|  |  |  | import sys | 
					
						
							|  |  |  | import asyncio | 
					
						
							|  |  |  | import random | 
					
						
							|  |  |  | import shutil | 
					
						
							|  |  |  | from typing import Tuple, List, Iterable, Dict | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from worlds.wargroove import WargrooveWorld | 
					
						
							|  |  |  | from worlds.wargroove.Items import item_table, faction_table, CommanderData, ItemData | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import ModuleUpdate | 
					
						
							|  |  |  | ModuleUpdate.update() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | import Utils | 
					
						
							|  |  |  | import json | 
					
						
							|  |  |  | import logging | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == "__main__": | 
					
						
							|  |  |  |     Utils.init_logging("WargrooveClient", exception_logger="Client") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | from NetUtils import NetworkItem, ClientStatus | 
					
						
							|  |  |  | from CommonClient import gui_enabled, logger, get_base_parser, ClientCommandProcessor, \ | 
					
						
							|  |  |  |     CommonContext, server_loop | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | wg_logger = logging.getLogger("WG") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WargrooveClientCommandProcessor(ClientCommandProcessor): | 
					
						
							|  |  |  |     def _cmd_resync(self): | 
					
						
							|  |  |  |         """Manually trigger a resync.""" | 
					
						
							|  |  |  |         self.output(f"Syncing items.") | 
					
						
							|  |  |  |         self.ctx.syncing = True | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def _cmd_commander(self, *commander_name: Iterable[str]): | 
					
						
							|  |  |  |         """Set the current commander to the given commander.""" | 
					
						
							|  |  |  |         if commander_name: | 
					
						
							|  |  |  |             self.ctx.set_commander(' '.join(commander_name)) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             if self.ctx.can_choose_commander: | 
					
						
							|  |  |  |                 commanders = self.ctx.get_commanders() | 
					
						
							|  |  |  |                 wg_logger.info('Unlocked commanders: ' + | 
					
						
							|  |  |  |                                ', '.join((commander.name for commander, unlocked in commanders if unlocked))) | 
					
						
							|  |  |  |                 wg_logger.info('Locked commanders: ' + | 
					
						
							|  |  |  |                                ', '.join((commander.name for commander, unlocked in commanders if not unlocked))) | 
					
						
							|  |  |  |             else: | 
					
						
							|  |  |  |                 wg_logger.error('Cannot set commanders in this game mode.') | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | class WargrooveContext(CommonContext): | 
					
						
							|  |  |  |     command_processor: int = WargrooveClientCommandProcessor | 
					
						
							|  |  |  |     game = "Wargroove" | 
					
						
							|  |  |  |     items_handling = 0b111  # full remote | 
					
						
							|  |  |  |     current_commander: CommanderData = faction_table["Starter"][0] | 
					
						
							|  |  |  |     can_choose_commander: bool = False | 
					
						
							|  |  |  |     commander_defense_boost_multiplier: int = 0 | 
					
						
							|  |  |  |     income_boost_multiplier: int = 0 | 
					
						
							|  |  |  |     starting_groove_multiplier: float | 
					
						
							|  |  |  |     faction_item_ids = { | 
					
						
							|  |  |  |         'Starter': 0, | 
					
						
							|  |  |  |         'Cherrystone': 52025, | 
					
						
							|  |  |  |         'Felheim': 52026, | 
					
						
							|  |  |  |         'Floran': 52027, | 
					
						
							|  |  |  |         'Heavensong': 52028, | 
					
						
							|  |  |  |         'Requiem': 52029, | 
					
						
							|  |  |  |         'Outlaw': 52030 | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  |     buff_item_ids = { | 
					
						
							|  |  |  |         'Income Boost': 52023, | 
					
						
							|  |  |  |         'Commander Defense Boost': 52024, | 
					
						
							|  |  |  |     } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def __init__(self, server_address, password): | 
					
						
							|  |  |  |         super(WargrooveContext, self).__init__(server_address, password) | 
					
						
							|  |  |  |         self.send_index: int = 0 | 
					
						
							|  |  |  |         self.syncing = False | 
					
						
							|  |  |  |         self.awaiting_bridge = False | 
					
						
							|  |  |  |         # self.game_communication_path: files go in this path to pass data between us and the actual game | 
					
						
							|  |  |  |         if "appdata" in os.environ: | 
					
						
							|  |  |  |             options = Utils.get_options() | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |             root_directory = os.path.join(options["wargroove_options"]["root_directory"]) | 
					
						
							|  |  |  |             data_directory = os.path.join("lib", "worlds", "wargroove", "data") | 
					
						
							|  |  |  |             dev_data_directory = os.path.join("worlds", "wargroove", "data") | 
					
						
							|  |  |  |             appdata_wargroove = os.path.expandvars(os.path.join("%APPDATA%", "Chucklefish", "Wargroove")) | 
					
						
							|  |  |  |             if not os.path.isfile(os.path.join(root_directory, "win64_bin", "wargroove64.exe")): | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |                 print_error_and_close("WargrooveClient couldn't find wargroove64.exe. " | 
					
						
							|  |  |  |                                       "Unable to infer required game_communication_path") | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |             self.game_communication_path = os.path.join(root_directory, "AP") | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |             if not os.path.exists(self.game_communication_path): | 
					
						
							|  |  |  |                 os.makedirs(self.game_communication_path) | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |             self.remove_communication_files() | 
					
						
							|  |  |  |             atexit.register(self.remove_communication_files) | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |             if not os.path.isdir(appdata_wargroove): | 
					
						
							|  |  |  |                 print_error_and_close("WargrooveClient couldn't find Wargoove in appdata!" | 
					
						
							|  |  |  |                                       "Boot Wargroove and then close it to attempt to fix this error") | 
					
						
							|  |  |  |             if not os.path.isdir(data_directory): | 
					
						
							|  |  |  |                 data_directory = dev_data_directory | 
					
						
							|  |  |  |             if not os.path.isdir(data_directory): | 
					
						
							|  |  |  |                 print_error_and_close("WargrooveClient couldn't find Wargoove mod and save files in install!") | 
					
						
							|  |  |  |             shutil.copytree(data_directory, appdata_wargroove, dirs_exist_ok=True) | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             print_error_and_close("WargrooveClient couldn't detect system type. " | 
					
						
							|  |  |  |                                   "Unable to infer required game_communication_path") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def server_auth(self, password_requested: bool = False): | 
					
						
							|  |  |  |         if password_requested and not self.password: | 
					
						
							|  |  |  |             await super(WargrooveContext, self).server_auth(password_requested) | 
					
						
							|  |  |  |         await self.get_username() | 
					
						
							|  |  |  |         await self.send_connect() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def connection_closed(self): | 
					
						
							|  |  |  |         await super(WargrooveContext, self).connection_closed() | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |         self.remove_communication_files() | 
					
						
							| 
									
										
										
										
											2023-11-16 05:35:20 -05:00
										 |  |  |         self.checked_locations.clear() | 
					
						
							|  |  |  |         self.server_locations.clear() | 
					
						
							|  |  |  |         self.finished_game = False | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     @property | 
					
						
							|  |  |  |     def endpoints(self): | 
					
						
							|  |  |  |         if self.server: | 
					
						
							|  |  |  |             return [self.server] | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             return [] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     async def shutdown(self): | 
					
						
							|  |  |  |         await super(WargrooveContext, self).shutdown() | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |         self.remove_communication_files() | 
					
						
							| 
									
										
										
										
											2023-11-16 05:35:20 -05:00
										 |  |  |         self.checked_locations.clear() | 
					
						
							|  |  |  |         self.server_locations.clear() | 
					
						
							|  |  |  |         self.finished_game = False | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def remove_communication_files(self): | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |         for root, dirs, files in os.walk(self.game_communication_path): | 
					
						
							|  |  |  |             for file in files: | 
					
						
							| 
									
										
										
										
											2023-03-03 12:24:09 -05:00
										 |  |  |                 os.remove(root + "/" + file) | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  | 
 | 
					
						
							|  |  |  |     def on_package(self, cmd: str, args: dict): | 
					
						
							|  |  |  |         if cmd in {"Connected"}: | 
					
						
							|  |  |  |             filename = f"AP_settings.json" | 
					
						
							|  |  |  |             with open(os.path.join(self.game_communication_path, filename), 'w') as f: | 
					
						
							|  |  |  |                 slot_data = args["slot_data"] | 
					
						
							|  |  |  |                 json.dump(args["slot_data"], f) | 
					
						
							|  |  |  |                 self.can_choose_commander = slot_data["can_choose_commander"] | 
					
						
							|  |  |  |                 print('can choose commander:', self.can_choose_commander) | 
					
						
							|  |  |  |                 self.starting_groove_multiplier = slot_data["starting_groove_multiplier"] | 
					
						
							|  |  |  |                 self.income_boost_multiplier = slot_data["income_boost"] | 
					
						
							|  |  |  |                 self.commander_defense_boost_multiplier = slot_data["commander_defense_boost"] | 
					
						
							|  |  |  |                 f.close() | 
					
						
							|  |  |  |             for ss in self.checked_locations: | 
					
						
							|  |  |  |                 filename = f"send{ss}" | 
					
						
							|  |  |  |                 with open(os.path.join(self.game_communication_path, filename), 'w') as f: | 
					
						
							|  |  |  |                     f.close() | 
					
						
							|  |  |  |             self.update_commander_data() | 
					
						
							|  |  |  |             self.ui.update_tracker() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             random.seed(self.seed_name + str(self.slot)) | 
					
						
							|  |  |  |             # Our indexes start at 1 and we have 24 levels | 
					
						
							|  |  |  |             for i in range(1, 25): | 
					
						
							|  |  |  |                 filename = f"seed{i}" | 
					
						
							|  |  |  |                 with open(os.path.join(self.game_communication_path, filename), 'w') as f: | 
					
						
							|  |  |  |                     f.write(str(random.randint(0, 4294967295))) | 
					
						
							|  |  |  |                     f.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if cmd in {"RoomInfo"}: | 
					
						
							|  |  |  |             self.seed_name = args["seed_name"] | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if cmd in {"ReceivedItems"}: | 
					
						
							|  |  |  |             received_ids = [item.item for item in self.items_received] | 
					
						
							|  |  |  |             for network_item in self.items_received: | 
					
						
							|  |  |  |                 filename = f"AP_{str(network_item.item)}.item" | 
					
						
							|  |  |  |                 path = os.path.join(self.game_communication_path, filename) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 # Newly-obtained items | 
					
						
							|  |  |  |                 if not os.path.isfile(path): | 
					
						
							|  |  |  |                     open(path, 'w').close() | 
					
						
							|  |  |  |                     # Announcing commander unlocks | 
					
						
							| 
									
										
										
										
											2024-06-01 06:07:13 -05:00
										 |  |  |                     item_name = self.item_names.lookup_in_slot(network_item.item) | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |                     if item_name in faction_table.keys(): | 
					
						
							|  |  |  |                         for commander in faction_table[item_name]: | 
					
						
							|  |  |  |                             logger.info(f"{commander.name} has been unlocked!") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 with open(path, 'w') as f: | 
					
						
							|  |  |  |                     item_count = received_ids.count(network_item.item) | 
					
						
							|  |  |  |                     if self.buff_item_ids["Income Boost"] == network_item.item: | 
					
						
							|  |  |  |                         f.write(f"{item_count * self.income_boost_multiplier}") | 
					
						
							|  |  |  |                     elif self.buff_item_ids["Commander Defense Boost"] == network_item.item: | 
					
						
							|  |  |  |                         f.write(f"{item_count * self.commander_defense_boost_multiplier}") | 
					
						
							|  |  |  |                     else: | 
					
						
							|  |  |  |                         f.write(f"{item_count}") | 
					
						
							|  |  |  |                     f.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                 print_filename = f"AP_{str(network_item.item)}.item.print" | 
					
						
							|  |  |  |                 print_path = os.path.join(self.game_communication_path, print_filename) | 
					
						
							|  |  |  |                 if not os.path.isfile(print_path): | 
					
						
							|  |  |  |                     open(print_path, 'w').close() | 
					
						
							|  |  |  |                     with open(print_path, 'w') as f: | 
					
						
							|  |  |  |                         f.write("Received " + | 
					
						
							| 
									
										
										
										
											2024-06-01 06:07:13 -05:00
										 |  |  |                                 self.item_names.lookup_in_slot(network_item.item) + | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |                                 " from " + | 
					
						
							|  |  |  |                                 self.player_names[network_item.player]) | 
					
						
							|  |  |  |                         f.close() | 
					
						
							|  |  |  |             self.update_commander_data() | 
					
						
							|  |  |  |             self.ui.update_tracker() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         if cmd in {"RoomUpdate"}: | 
					
						
							|  |  |  |             if "checked_locations" in args: | 
					
						
							|  |  |  |                 for ss in self.checked_locations: | 
					
						
							|  |  |  |                     filename = f"send{ss}" | 
					
						
							|  |  |  |                     with open(os.path.join(self.game_communication_path, filename), 'w') as f: | 
					
						
							|  |  |  |                         f.close() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def run_gui(self): | 
					
						
							|  |  |  |         """Import kivy UI system and start running it as self.ui_task.""" | 
					
						
							|  |  |  |         from kvui import GameManager, HoverBehavior, ServerToolTip | 
					
						
							|  |  |  |         from kivy.uix.tabbedpanel import TabbedPanelItem | 
					
						
							|  |  |  |         from kivy.lang import Builder | 
					
						
							|  |  |  |         from kivy.uix.button import Button | 
					
						
							|  |  |  |         from kivy.uix.togglebutton import ToggleButton | 
					
						
							|  |  |  |         from kivy.uix.boxlayout import BoxLayout | 
					
						
							|  |  |  |         from kivy.uix.gridlayout import GridLayout | 
					
						
							|  |  |  |         from kivy.uix.image import AsyncImage, Image | 
					
						
							|  |  |  |         from kivy.uix.stacklayout import StackLayout | 
					
						
							|  |  |  |         from kivy.uix.label import Label | 
					
						
							|  |  |  |         from kivy.properties import ColorProperty | 
					
						
							|  |  |  |         from kivy.uix.image import Image | 
					
						
							|  |  |  |         import pkgutil | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class TrackerLayout(BoxLayout): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class CommanderSelect(BoxLayout): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class CommanderButton(ToggleButton): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class FactionBox(BoxLayout): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class CommanderGroup(BoxLayout): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class ItemTracker(BoxLayout): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class ItemLabel(Label): | 
					
						
							|  |  |  |             pass | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         class WargrooveManager(GameManager): | 
					
						
							|  |  |  |             logging_pairs = [ | 
					
						
							|  |  |  |                 ("Client", "Archipelago"), | 
					
						
							|  |  |  |                 ("WG", "WG Console"), | 
					
						
							|  |  |  |             ] | 
					
						
							|  |  |  |             base_title = "Archipelago Wargroove Client" | 
					
						
							|  |  |  |             ctx: WargrooveContext | 
					
						
							|  |  |  |             unit_tracker: ItemTracker | 
					
						
							|  |  |  |             trigger_tracker: BoxLayout | 
					
						
							|  |  |  |             boost_tracker: BoxLayout | 
					
						
							|  |  |  |             commander_buttons: Dict[int, List[CommanderButton]] | 
					
						
							|  |  |  |             tracker_items = { | 
					
						
							|  |  |  |                 "Swordsman": ItemData(None, "Unit", False), | 
					
						
							|  |  |  |                 "Dog": ItemData(None, "Unit", False), | 
					
						
							|  |  |  |                 **item_table | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def build(self): | 
					
						
							|  |  |  |                 container = super().build() | 
					
						
							|  |  |  |                 panel = TabbedPanelItem(text="Wargroove") | 
					
						
							|  |  |  |                 panel.content = self.build_tracker() | 
					
						
							|  |  |  |                 self.tabs.add_widget(panel) | 
					
						
							|  |  |  |                 return container | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def build_tracker(self) -> TrackerLayout: | 
					
						
							|  |  |  |                 try: | 
					
						
							|  |  |  |                     tracker = TrackerLayout(orientation="horizontal") | 
					
						
							|  |  |  |                     commander_select = CommanderSelect(orientation="vertical") | 
					
						
							|  |  |  |                     self.commander_buttons = {} | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |                     for faction, commanders in faction_table.items(): | 
					
						
							|  |  |  |                         faction_box = FactionBox(size_hint=(None, None), width=100 * len(commanders), height=70) | 
					
						
							|  |  |  |                         commander_group = CommanderGroup() | 
					
						
							|  |  |  |                         commander_buttons = [] | 
					
						
							|  |  |  |                         for commander in commanders: | 
					
						
							|  |  |  |                             commander_button = CommanderButton(text=commander.name, group="commanders") | 
					
						
							|  |  |  |                             if faction == "Starter": | 
					
						
							|  |  |  |                                 commander_button.disabled = False | 
					
						
							|  |  |  |                             commander_button.bind(on_press=lambda instance: self.ctx.set_commander(instance.text)) | 
					
						
							|  |  |  |                             commander_buttons.append(commander_button) | 
					
						
							|  |  |  |                             commander_group.add_widget(commander_button) | 
					
						
							|  |  |  |                         self.commander_buttons[faction] = commander_buttons | 
					
						
							|  |  |  |                         faction_box.add_widget(Label(text=faction, size_hint_x=None, pos_hint={'left': 1}, size_hint_y=None, height=10)) | 
					
						
							|  |  |  |                         faction_box.add_widget(commander_group) | 
					
						
							|  |  |  |                         commander_select.add_widget(faction_box) | 
					
						
							|  |  |  |                     item_tracker = ItemTracker(padding=[0,20]) | 
					
						
							|  |  |  |                     self.unit_tracker = BoxLayout(orientation="vertical") | 
					
						
							|  |  |  |                     other_tracker = BoxLayout(orientation="vertical") | 
					
						
							|  |  |  |                     self.trigger_tracker = BoxLayout(orientation="vertical") | 
					
						
							|  |  |  |                     self.boost_tracker = BoxLayout(orientation="vertical") | 
					
						
							|  |  |  |                     other_tracker.add_widget(self.trigger_tracker) | 
					
						
							|  |  |  |                     other_tracker.add_widget(self.boost_tracker) | 
					
						
							|  |  |  |                     item_tracker.add_widget(self.unit_tracker) | 
					
						
							|  |  |  |                     item_tracker.add_widget(other_tracker) | 
					
						
							|  |  |  |                     tracker.add_widget(commander_select) | 
					
						
							|  |  |  |                     tracker.add_widget(item_tracker) | 
					
						
							|  |  |  |                     self.update_tracker() | 
					
						
							|  |  |  |                     return tracker | 
					
						
							|  |  |  |                 except Exception as e: | 
					
						
							|  |  |  |                     print(e) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |             def update_tracker(self): | 
					
						
							|  |  |  |                 received_ids = [item.item for item in self.ctx.items_received] | 
					
						
							|  |  |  |                 for faction, item_id in self.ctx.faction_item_ids.items(): | 
					
						
							|  |  |  |                     for commander_button in self.commander_buttons[faction]: | 
					
						
							|  |  |  |                         commander_button.disabled = not (faction == "Starter" or item_id in received_ids) | 
					
						
							|  |  |  |                 self.unit_tracker.clear_widgets() | 
					
						
							|  |  |  |                 self.trigger_tracker.clear_widgets() | 
					
						
							|  |  |  |                 for name, item in self.tracker_items.items(): | 
					
						
							|  |  |  |                     if item.type in ("Unit", "Trigger"): | 
					
						
							|  |  |  |                         status_color = (1, 1, 1, 1) if item.code is None or item.code in received_ids else (0.6, 0.2, 0.2, 1) | 
					
						
							|  |  |  |                         label = ItemLabel(text=name, color=status_color) | 
					
						
							|  |  |  |                         if item.type == "Unit": | 
					
						
							|  |  |  |                             self.unit_tracker.add_widget(label) | 
					
						
							|  |  |  |                         else: | 
					
						
							|  |  |  |                             self.trigger_tracker.add_widget(label) | 
					
						
							|  |  |  |                 self.boost_tracker.clear_widgets() | 
					
						
							|  |  |  |                 extra_income = received_ids.count(52023) * self.ctx.income_boost_multiplier | 
					
						
							|  |  |  |                 extra_defense = received_ids.count(52024) * self.ctx.commander_defense_boost_multiplier | 
					
						
							|  |  |  |                 income_boost = ItemLabel(text="Extra Income: " + str(extra_income)) | 
					
						
							|  |  |  |                 defense_boost = ItemLabel(text="Comm Defense: " + str(100 + extra_defense)) | 
					
						
							|  |  |  |                 self.boost_tracker.add_widget(income_boost) | 
					
						
							|  |  |  |                 self.boost_tracker.add_widget(defense_boost) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         self.ui = WargrooveManager(self) | 
					
						
							|  |  |  |         data = pkgutil.get_data(WargrooveWorld.__module__, "Wargroove.kv").decode() | 
					
						
							|  |  |  |         Builder.load_string(data) | 
					
						
							|  |  |  |         self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def update_commander_data(self): | 
					
						
							|  |  |  |         if self.can_choose_commander: | 
					
						
							|  |  |  |             faction_items = 0 | 
					
						
							|  |  |  |             faction_item_names = [faction + ' Commanders' for faction in faction_table.keys()] | 
					
						
							|  |  |  |             for network_item in self.items_received: | 
					
						
							| 
									
										
										
										
											2024-06-01 06:07:13 -05:00
										 |  |  |                 if self.item_names.lookup_in_slot(network_item.item) in faction_item_names: | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |                     faction_items += 1 | 
					
						
							|  |  |  |             starting_groove = (faction_items - 1) * self.starting_groove_multiplier | 
					
						
							|  |  |  |             # Must be an integer larger than 0 | 
					
						
							|  |  |  |             starting_groove = int(max(starting_groove, 0)) | 
					
						
							|  |  |  |             data = { | 
					
						
							|  |  |  |                 "commander": self.current_commander.internal_name, | 
					
						
							|  |  |  |                 "starting_groove": starting_groove | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             data = { | 
					
						
							|  |  |  |                 "commander": "seed", | 
					
						
							|  |  |  |                 "starting_groove": 0 | 
					
						
							|  |  |  |             } | 
					
						
							|  |  |  |         filename = 'commander.json' | 
					
						
							|  |  |  |         with open(os.path.join(self.game_communication_path, filename), 'w') as f: | 
					
						
							|  |  |  |             json.dump(data, f) | 
					
						
							|  |  |  |         if self.ui: | 
					
						
							|  |  |  |             self.ui.update_tracker() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def set_commander(self, commander_name: str) -> bool: | 
					
						
							|  |  |  |         """Sets the current commander to the given one, if possible""" | 
					
						
							|  |  |  |         if not self.can_choose_commander: | 
					
						
							|  |  |  |             wg_logger.error("Cannot set commanders in this game mode.") | 
					
						
							|  |  |  |             return | 
					
						
							|  |  |  |         match_name = commander_name.lower() | 
					
						
							|  |  |  |         for commander, unlocked in self.get_commanders(): | 
					
						
							|  |  |  |             if commander.name.lower() == match_name or commander.alt_name and commander.alt_name.lower() == match_name: | 
					
						
							|  |  |  |                 if unlocked: | 
					
						
							|  |  |  |                     self.current_commander = commander | 
					
						
							|  |  |  |                     self.syncing = True | 
					
						
							|  |  |  |                     wg_logger.info(f"Commander set to {commander.name}.") | 
					
						
							|  |  |  |                     self.update_commander_data() | 
					
						
							|  |  |  |                     return True | 
					
						
							|  |  |  |                 else: | 
					
						
							|  |  |  |                     wg_logger.error(f"Commander {commander.name} has not been unlocked.") | 
					
						
							|  |  |  |                     return False | 
					
						
							|  |  |  |         else: | 
					
						
							|  |  |  |             wg_logger.error(f"{commander_name} is not a recognized Wargroove commander.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     def get_commanders(self) -> List[Tuple[CommanderData, bool]]: | 
					
						
							|  |  |  |         """Gets a list of commanders with their unlocked status""" | 
					
						
							|  |  |  |         commanders = [] | 
					
						
							|  |  |  |         received_ids = [item.item for item in self.items_received] | 
					
						
							|  |  |  |         for faction in faction_table.keys(): | 
					
						
							|  |  |  |             unlocked = faction == 'Starter' or self.faction_item_ids[faction] in received_ids | 
					
						
							|  |  |  |             commanders += [(commander, unlocked) for commander in faction_table[faction]] | 
					
						
							|  |  |  |         return commanders | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | async def game_watcher(ctx: WargrooveContext): | 
					
						
							|  |  |  |     from worlds.wargroove.Locations import location_table | 
					
						
							|  |  |  |     while not ctx.exit_event.is_set(): | 
					
						
							|  |  |  |         if ctx.syncing == True: | 
					
						
							|  |  |  |             sync_msg = [{'cmd': 'Sync'}] | 
					
						
							|  |  |  |             if ctx.locations_checked: | 
					
						
							|  |  |  |                 sync_msg.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)}) | 
					
						
							|  |  |  |             await ctx.send_msgs(sync_msg) | 
					
						
							|  |  |  |             ctx.syncing = False | 
					
						
							|  |  |  |         sending = [] | 
					
						
							|  |  |  |         victory = False | 
					
						
							|  |  |  |         for root, dirs, files in os.walk(ctx.game_communication_path): | 
					
						
							|  |  |  |             for file in files: | 
					
						
							|  |  |  |                 if file.find("send") > -1: | 
					
						
							|  |  |  |                     st = file.split("send", -1)[1] | 
					
						
							|  |  |  |                     sending = sending+[(int(st))] | 
					
						
							| 
									
										
										
										
											2023-11-16 05:35:20 -05:00
										 |  |  |                     os.remove(os.path.join(ctx.game_communication_path, file)) | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |                 if file.find("victory") > -1: | 
					
						
							|  |  |  |                     victory = True | 
					
						
							| 
									
										
										
										
											2023-11-16 05:35:20 -05:00
										 |  |  |                     os.remove(os.path.join(ctx.game_communication_path, file)) | 
					
						
							| 
									
										
										
										
											2023-02-24 01:35:09 -05:00
										 |  |  |         ctx.locations_checked = sending | 
					
						
							|  |  |  |         message = [{"cmd": 'LocationChecks', "locations": sending}] | 
					
						
							|  |  |  |         await ctx.send_msgs(message) | 
					
						
							|  |  |  |         if not ctx.finished_game and victory: | 
					
						
							|  |  |  |             await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}]) | 
					
						
							|  |  |  |             ctx.finished_game = True | 
					
						
							|  |  |  |         await asyncio.sleep(0.1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | def print_error_and_close(msg): | 
					
						
							|  |  |  |     logger.error("Error: " + msg) | 
					
						
							|  |  |  |     Utils.messagebox("Error", msg, error=True) | 
					
						
							|  |  |  |     sys.exit(1) | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  | if __name__ == '__main__': | 
					
						
							|  |  |  |     async def main(args): | 
					
						
							|  |  |  |         ctx = WargrooveContext(args.connect, args.password) | 
					
						
							|  |  |  |         ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop") | 
					
						
							|  |  |  |         if gui_enabled: | 
					
						
							|  |  |  |             ctx.run_gui() | 
					
						
							|  |  |  |         ctx.run_cli() | 
					
						
							|  |  |  |         progression_watcher = asyncio.create_task( | 
					
						
							|  |  |  |             game_watcher(ctx), name="WargrooveProgressionWatcher") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         await ctx.exit_event.wait() | 
					
						
							|  |  |  |         ctx.server_address = None | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         await progression_watcher | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |         await ctx.shutdown() | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     import colorama | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     parser = get_base_parser(description="Wargroove Client, for text interfacing.") | 
					
						
							|  |  |  | 
 | 
					
						
							|  |  |  |     args, rest = parser.parse_known_args() | 
					
						
							|  |  |  |     colorama.init() | 
					
						
							|  |  |  |     asyncio.run(main(args)) | 
					
						
							|  |  |  |     colorama.deinit() |