477 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
		
		
			
		
	
	
			477 lines
		
	
	
		
			20 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
|   | # pylint: disable=W0212,R0916,R0904 | ||
|  | from __future__ import annotations | ||
|  | 
 | ||
|  | import math | ||
|  | from functools import cached_property | ||
|  | from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple, Union | ||
|  | 
 | ||
|  | from .bot_ai_internal import BotAIInternal | ||
|  | from .cache import property_cache_once_per_frame | ||
|  | from .data import Alert, Result | ||
|  | from .position import Point2 | ||
|  | from .unit import Unit | ||
|  | from .units import Units | ||
|  | 
 | ||
|  | if TYPE_CHECKING: | ||
|  |     from .game_info import Ramp | ||
|  | 
 | ||
|  | 
 | ||
|  | class BotAI(BotAIInternal): | ||
|  |     """Base class for bots.""" | ||
|  | 
 | ||
|  |     EXPANSION_GAP_THRESHOLD = 15 | ||
|  | 
 | ||
|  |     @property | ||
|  |     def time(self) -> float: | ||
|  |         """ Returns time in seconds, assumes the game is played on 'faster' """ | ||
|  |         return self.state.game_loop / 22.4  # / (1/1.4) * (1/16) | ||
|  | 
 | ||
|  |     @property | ||
|  |     def time_formatted(self) -> str: | ||
|  |         """ Returns time as string in min:sec format """ | ||
|  |         t = self.time | ||
|  |         return f"{int(t // 60):02}:{int(t % 60):02}" | ||
|  | 
 | ||
|  |     @property | ||
|  |     def step_time(self) -> Tuple[float, float, float, float]: | ||
|  |         """Returns a tuple of step duration in milliseconds.
 | ||
|  |         First value is the minimum step duration - the shortest the bot ever took | ||
|  |         Second value is the average step duration | ||
|  |         Third value is the maximum step duration - the longest the bot ever took (including on_start()) | ||
|  |         Fourth value is the step duration the bot took last iteration | ||
|  |         If called in the first iteration, it returns (inf, 0, 0, 0)"""
 | ||
|  |         avg_step_duration = ( | ||
|  |             (self._total_time_in_on_step / self._total_steps_iterations) if self._total_steps_iterations else 0 | ||
|  |         ) | ||
|  |         return ( | ||
|  |             self._min_step_time * 1000, | ||
|  |             avg_step_duration * 1000, | ||
|  |             self._max_step_time * 1000, | ||
|  |             self._last_step_step_time * 1000, | ||
|  |         ) | ||
|  | 
 | ||
|  |     def alert(self, alert_code: Alert) -> bool: | ||
|  |         """
 | ||
|  |         Check if alert is triggered in the current step. | ||
|  |         Possible alerts are listed here https://github.com/Blizzard/s2client-proto/blob/e38efed74c03bec90f74b330ea1adda9215e655f/s2clientprotocol/sc2api.proto#L679-L702 | ||
|  | 
 | ||
|  |         Example use:: | ||
|  | 
 | ||
|  |             from sc2.data import Alert | ||
|  |             if self.alert(Alert.AddOnComplete): | ||
|  |                 print("Addon Complete") | ||
|  | 
 | ||
|  |         Alert codes:: | ||
|  | 
 | ||
|  |             AlertError | ||
|  |             AddOnComplete | ||
|  |             BuildingComplete | ||
|  |             BuildingUnderAttack | ||
|  |             LarvaHatched | ||
|  |             MergeComplete | ||
|  |             MineralsExhausted | ||
|  |             MorphComplete | ||
|  |             MothershipComplete | ||
|  |             MULEExpired | ||
|  |             NuclearLaunchDetected | ||
|  |             NukeComplete | ||
|  |             NydusWormDetected | ||
|  |             ResearchComplete | ||
|  |             TrainError | ||
|  |             TrainUnitComplete | ||
|  |             TrainWorkerComplete | ||
|  |             TransformationComplete | ||
|  |             UnitUnderAttack | ||
|  |             UpgradeComplete | ||
|  |             VespeneExhausted | ||
|  |             WarpInComplete | ||
|  | 
 | ||
|  |         :param alert_code: | ||
|  |         """
 | ||
|  |         assert isinstance(alert_code, Alert), f"alert_code {alert_code} is no Alert" | ||
|  |         return alert_code.value in self.state.alerts | ||
|  | 
 | ||
|  |     @property | ||
|  |     def start_location(self) -> Point2: | ||
|  |         """
 | ||
|  |         Returns the spawn location of the bot, using the position of the first created townhall. | ||
|  |         This will be None if the bot is run on an arcade or custom map that does not feature townhalls at game start. | ||
|  |         """
 | ||
|  |         return self.game_info.player_start_location | ||
|  | 
 | ||
|  |     @property | ||
|  |     def enemy_start_locations(self) -> List[Point2]: | ||
|  |         """Possible start locations for enemies.""" | ||
|  |         return self.game_info.start_locations | ||
|  | 
 | ||
|  |     @cached_property | ||
|  |     def main_base_ramp(self) -> Ramp: | ||
|  |         """Returns the Ramp instance of the closest main-ramp to start location.
 | ||
|  |         Look in game_info.py for more information about the Ramp class | ||
|  | 
 | ||
|  |         Example: See terran ramp wall bot | ||
|  |         """
 | ||
|  |         # The reason for len(ramp.upper) in {2, 5} is: | ||
|  |         # ParaSite map has 5 upper points, and most other maps have 2 upper points at the main ramp. | ||
|  |         # The map Acolyte has 4 upper points at the wrong ramp (which is closest to the start position). | ||
|  |         try: | ||
|  |             found_main_base_ramp = min( | ||
|  |                 (ramp for ramp in self.game_info.map_ramps if len(ramp.upper) in {2, 5}), | ||
|  |                 key=lambda r: self.start_location.distance_to(r.top_center), | ||
|  |             ) | ||
|  |         except ValueError: | ||
|  |             # Hardcoded hotfix for Honorgrounds LE map, as that map has a large main base ramp with inbase natural | ||
|  |             found_main_base_ramp = min( | ||
|  |                 (ramp for ramp in self.game_info.map_ramps if len(ramp.upper) in {4, 9}), | ||
|  |                 key=lambda r: self.start_location.distance_to(r.top_center), | ||
|  |             ) | ||
|  |         return found_main_base_ramp | ||
|  | 
 | ||
|  |     @property_cache_once_per_frame | ||
|  |     def expansion_locations_list(self) -> List[Point2]: | ||
|  |         """ Returns a list of expansion positions, not sorted in any way. """ | ||
|  |         assert ( | ||
|  |             self._expansion_positions_list | ||
|  |         ), "self._find_expansion_locations() has not been run yet, so accessing the list of expansion locations is pointless." | ||
|  |         return self._expansion_positions_list | ||
|  | 
 | ||
|  |     @property_cache_once_per_frame | ||
|  |     def expansion_locations_dict(self) -> Dict[Point2, Units]: | ||
|  |         """
 | ||
|  |         Returns dict with the correct expansion position Point2 object as key, | ||
|  |         resources as Units (mineral fields and vespene geysers) as value. | ||
|  | 
 | ||
|  |         Caution: This function is slow. If you only need the expansion locations, use the property above. | ||
|  |         """
 | ||
|  |         assert ( | ||
|  |             self._expansion_positions_list | ||
|  |         ), "self._find_expansion_locations() has not been run yet, so accessing the list of expansion locations is pointless." | ||
|  |         expansion_locations: Dict[Point2, Units] = {pos: Units([], self) for pos in self._expansion_positions_list} | ||
|  |         for resource in self.resources: | ||
|  |             # It may be that some resources are not mapped to an expansion location | ||
|  |             exp_position: Point2 = self._resource_location_to_expansion_position_dict.get(resource.position, None) | ||
|  |             if exp_position: | ||
|  |                 assert exp_position in expansion_locations | ||
|  |                 expansion_locations[exp_position].append(resource) | ||
|  |         return expansion_locations | ||
|  | 
 | ||
|  |     async def get_next_expansion(self) -> Optional[Point2]: | ||
|  |         """Find next expansion location.""" | ||
|  | 
 | ||
|  |         closest = None | ||
|  |         distance = math.inf | ||
|  |         for el in self.expansion_locations_list: | ||
|  | 
 | ||
|  |             def is_near_to_expansion(t): | ||
|  |                 return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD | ||
|  | 
 | ||
|  |             if any(map(is_near_to_expansion, self.townhalls)): | ||
|  |                 # already taken | ||
|  |                 continue | ||
|  | 
 | ||
|  |             startp = self.game_info.player_start_location | ||
|  |             d = await self.client.query_pathing(startp, el) | ||
|  |             if d is None: | ||
|  |                 continue | ||
|  | 
 | ||
|  |             if d < distance: | ||
|  |                 distance = d | ||
|  |                 closest = el | ||
|  | 
 | ||
|  |         return closest | ||
|  | 
 | ||
|  |     # pylint: disable=R0912 | ||
|  |     async def distribute_workers(self, resource_ratio: float = 2): | ||
|  |         """
 | ||
|  |         Distributes workers across all the bases taken. | ||
|  |         Keyword `resource_ratio` takes a float. If the current minerals to gas | ||
|  |         ratio is bigger than `resource_ratio`, this function prefer filling gas_buildings | ||
|  |         first, if it is lower, it will prefer sending workers to minerals first. | ||
|  | 
 | ||
|  |         NOTE: This function is far from optimal, if you really want to have | ||
|  |         refined worker control, you should write your own distribution function. | ||
|  |         For example long distance mining control and moving workers if a base was killed | ||
|  |         are not being handled. | ||
|  | 
 | ||
|  |         WARNING: This is quite slow when there are lots of workers or multiple bases. | ||
|  | 
 | ||
|  |         :param resource_ratio:"""
 | ||
|  |         if not self.mineral_field or not self.workers or not self.townhalls.ready: | ||
|  |             return | ||
|  |         worker_pool = self.workers.idle | ||
|  |         bases = self.townhalls.ready | ||
|  |         gas_buildings = self.gas_buildings.ready | ||
|  | 
 | ||
|  |         # list of places that need more workers | ||
|  |         deficit_mining_places = [] | ||
|  | 
 | ||
|  |         for mining_place in bases | gas_buildings: | ||
|  |             difference = mining_place.surplus_harvesters | ||
|  |             # perfect amount of workers, skip mining place | ||
|  |             if not difference: | ||
|  |                 continue | ||
|  |             if mining_place.has_vespene: | ||
|  |                 # get all workers that target the gas extraction site | ||
|  |                 # or are on their way back from it | ||
|  |                 local_workers = self.workers.filter( | ||
|  |                     lambda unit: unit.order_target == mining_place.tag or | ||
|  |                     (unit.is_carrying_vespene and unit.order_target == bases.closest_to(mining_place).tag) | ||
|  |                 ) | ||
|  |             else: | ||
|  |                 # get tags of minerals around expansion | ||
|  |                 local_minerals_tags = { | ||
|  |                     mineral.tag | ||
|  |                     for mineral in self.mineral_field if mineral.distance_to(mining_place) <= 8 | ||
|  |                 } | ||
|  |                 # get all target tags a worker can have | ||
|  |                 # tags of the minerals he could mine at that base | ||
|  |                 # get workers that work at that gather site | ||
|  |                 local_workers = self.workers.filter( | ||
|  |                     lambda unit: unit.order_target in local_minerals_tags or | ||
|  |                     (unit.is_carrying_minerals and unit.order_target == mining_place.tag) | ||
|  |                 ) | ||
|  |             # too many workers | ||
|  |             if difference > 0: | ||
|  |                 for worker in local_workers[:difference]: | ||
|  |                     worker_pool.append(worker) | ||
|  |             # too few workers | ||
|  |             # add mining place to deficit bases for every missing worker | ||
|  |             else: | ||
|  |                 deficit_mining_places += [mining_place for _ in range(-difference)] | ||
|  | 
 | ||
|  |         # prepare all minerals near a base if we have too many workers | ||
|  |         # and need to send them to the closest patch | ||
|  |         if len(worker_pool) > len(deficit_mining_places): | ||
|  |             all_minerals_near_base = [ | ||
|  |                 mineral for mineral in self.mineral_field | ||
|  |                 if any(mineral.distance_to(base) <= 8 for base in self.townhalls.ready) | ||
|  |             ] | ||
|  |         # distribute every worker in the pool | ||
|  |         for worker in worker_pool: | ||
|  |             # as long as have workers and mining places | ||
|  |             if deficit_mining_places: | ||
|  |                 # choose only mineral fields first if current mineral to gas ratio is less than target ratio | ||
|  |                 if self.vespene and self.minerals / self.vespene < resource_ratio: | ||
|  |                     possible_mining_places = [place for place in deficit_mining_places if not place.vespene_contents] | ||
|  |                 # else prefer gas | ||
|  |                 else: | ||
|  |                     possible_mining_places = [place for place in deficit_mining_places if place.vespene_contents] | ||
|  |                 # if preferred type is not available any more, get all other places | ||
|  |                 if not possible_mining_places: | ||
|  |                     possible_mining_places = deficit_mining_places | ||
|  |                 # find closest mining place | ||
|  |                 current_place = min(deficit_mining_places, key=lambda place: place.distance_to(worker)) | ||
|  |                 # remove it from the list | ||
|  |                 deficit_mining_places.remove(current_place) | ||
|  |                 # if current place is a gas extraction site, go there | ||
|  |                 if current_place.vespene_contents: | ||
|  |                     worker.gather(current_place) | ||
|  |                 # if current place is a gas extraction site, | ||
|  |                 # go to the mineral field that is near and has the most minerals left | ||
|  |                 else: | ||
|  |                     local_minerals = ( | ||
|  |                         mineral for mineral in self.mineral_field if mineral.distance_to(current_place) <= 8 | ||
|  |                     ) | ||
|  |                     # local_minerals can be empty if townhall is misplaced | ||
|  |                     target_mineral = max(local_minerals, key=lambda mineral: mineral.mineral_contents, default=None) | ||
|  |                     if target_mineral: | ||
|  |                         worker.gather(target_mineral) | ||
|  |             # more workers to distribute than free mining spots | ||
|  |             # send to closest if worker is doing nothing | ||
|  |             elif worker.is_idle and all_minerals_near_base: | ||
|  |                 target_mineral = min(all_minerals_near_base, key=lambda mineral: mineral.distance_to(worker)) | ||
|  |                 worker.gather(target_mineral) | ||
|  |             else: | ||
|  |                 # there are no deficit mining places and worker is not idle | ||
|  |                 # so dont move him | ||
|  |                 pass | ||
|  | 
 | ||
|  |     @property_cache_once_per_frame | ||
|  |     def owned_expansions(self) -> Dict[Point2, Unit]: | ||
|  |         """Dict of expansions owned by the player with mapping {expansion_location: townhall_structure}.""" | ||
|  |         owned = {} | ||
|  |         for el in self.expansion_locations_list: | ||
|  | 
 | ||
|  |             def is_near_to_expansion(t): | ||
|  |                 return t.distance_to(el) < self.EXPANSION_GAP_THRESHOLD | ||
|  | 
 | ||
|  |             th = next((x for x in self.townhalls if is_near_to_expansion(x)), None) | ||
|  |             if th: | ||
|  |                 owned[el] = th | ||
|  |         return owned | ||
|  | 
 | ||
|  |     async def chat_send(self, message: str, team_only: bool = False): | ||
|  |         """Send a chat message to the SC2 Client.
 | ||
|  | 
 | ||
|  |         Example:: | ||
|  | 
 | ||
|  |             await self.chat_send("Hello, this is a message from my bot!") | ||
|  | 
 | ||
|  |         :param message: | ||
|  |         :param team_only:"""
 | ||
|  |         assert isinstance(message, str), f"{message} is not a string" | ||
|  |         await self.client.chat_send(message, team_only) | ||
|  | 
 | ||
|  |     def in_map_bounds(self, pos: Union[Point2, tuple, list]) -> bool: | ||
|  |         """Tests if a 2 dimensional point is within the map boundaries of the pixelmaps.
 | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         return ( | ||
|  |             self.game_info.playable_area.x <= pos[0] < | ||
|  |             self.game_info.playable_area.x + self.game_info.playable_area.width and self.game_info.playable_area.y <= | ||
|  |             pos[1] < self.game_info.playable_area.y + self.game_info.playable_area.height | ||
|  |         ) | ||
|  | 
 | ||
|  |     # For the functions below, make sure you are inside the boundaries of the map size. | ||
|  |     def get_terrain_height(self, pos: Union[Point2, Unit]) -> int: | ||
|  |         """Returns terrain height at a position.
 | ||
|  |         Caution: terrain height is different from a unit's z-coordinate. | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return self.game_info.terrain_height[pos] | ||
|  | 
 | ||
|  |     def get_terrain_z_height(self, pos: Union[Point2, Unit]) -> float: | ||
|  |         """Returns terrain z-height at a position.
 | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return -16 + 32 * self.game_info.terrain_height[pos] / 255 | ||
|  | 
 | ||
|  |     def in_placement_grid(self, pos: Union[Point2, Unit]) -> bool: | ||
|  |         """Returns True if you can place something at a position.
 | ||
|  |         Remember, buildings usually use 2x2, 3x3 or 5x5 of these grid points. | ||
|  |         Caution: some x and y offset might be required, see ramp code in game_info.py | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return self.game_info.placement_grid[pos] == 1 | ||
|  | 
 | ||
|  |     def in_pathing_grid(self, pos: Union[Point2, Unit]) -> bool: | ||
|  |         """Returns True if a ground unit can pass through a grid point.
 | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return self.game_info.pathing_grid[pos] == 1 | ||
|  | 
 | ||
|  |     def is_visible(self, pos: Union[Point2, Unit]) -> bool: | ||
|  |         """Returns True if you have vision on a grid point.
 | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         # more info: https://github.com/Blizzard/s2client-proto/blob/9906df71d6909511907d8419b33acc1a3bd51ec0/s2clientprotocol/spatial.proto#L19 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return self.state.visibility[pos] == 2 | ||
|  | 
 | ||
|  |     def has_creep(self, pos: Union[Point2, Unit]) -> bool: | ||
|  |         """Returns True if there is creep on the grid point.
 | ||
|  | 
 | ||
|  |         :param pos:"""
 | ||
|  |         assert isinstance(pos, (Point2, Unit)), "pos is not of type Point2 or Unit" | ||
|  |         pos = pos.position.rounded | ||
|  |         return self.state.creep[pos] == 1 | ||
|  | 
 | ||
|  |     async def on_unit_destroyed(self, unit_tag: int): | ||
|  |         """
 | ||
|  |         Override this in your bot class. | ||
|  |         Note that this function uses unit tags and not the unit objects | ||
|  |         because the unit does not exist any more. | ||
|  |         This will event will be called when a unit (or structure, friendly or enemy) dies. | ||
|  |         For enemy units, this only works if the enemy unit was in vision on death. | ||
|  | 
 | ||
|  |         :param unit_tag: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_unit_created(self, unit: Unit): | ||
|  |         """Override this in your bot class. This function is called when a unit is created.
 | ||
|  | 
 | ||
|  |         :param unit:"""
 | ||
|  | 
 | ||
|  |     async def on_building_construction_started(self, unit: Unit): | ||
|  |         """
 | ||
|  |         Override this in your bot class. | ||
|  |         This function is called when a building construction has started. | ||
|  | 
 | ||
|  |         :param unit: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_building_construction_complete(self, unit: Unit): | ||
|  |         """
 | ||
|  |         Override this in your bot class. This function is called when a building | ||
|  |         construction is completed. | ||
|  | 
 | ||
|  |         :param unit: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_unit_took_damage(self, unit: Unit, amount_damage_taken: float): | ||
|  |         """
 | ||
|  |         Override this in your bot class. This function is called when your own unit (unit or structure) took damage. | ||
|  |         It will not be called if the unit died this frame. | ||
|  | 
 | ||
|  |         This may be called frequently for terran structures that are burning down, or zerg buildings that are off creep, | ||
|  |         or terran bio units that just used stimpack ability. | ||
|  |         TODO: If there is a demand for it, then I can add a similar event for when enemy units took damage | ||
|  | 
 | ||
|  |         Examples:: | ||
|  | 
 | ||
|  |             print(f"My unit took damage: {unit} took {amount_damage_taken} damage") | ||
|  | 
 | ||
|  |         :param unit: | ||
|  |         :param amount_damage_taken: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_enemy_unit_entered_vision(self, unit: Unit): | ||
|  |         """
 | ||
|  |         Override this in your bot class. This function is called when an enemy unit (unit or structure) entered vision (which was not visible last frame). | ||
|  | 
 | ||
|  |         :param unit: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_enemy_unit_left_vision(self, unit_tag: int): | ||
|  |         """
 | ||
|  |         Override this in your bot class. This function is called when an enemy unit (unit or structure) left vision (which was visible last frame). | ||
|  |         Same as the self.on_unit_destroyed event, this function is called with the unit's tag because the unit is no longer visible anymore. | ||
|  |         If you want to store a snapshot of the unit, use self._enemy_units_previous_map[unit_tag] for units or self._enemy_structures_previous_map[unit_tag] for structures. | ||
|  | 
 | ||
|  |         Examples:: | ||
|  | 
 | ||
|  |             last_known_unit = self._enemy_units_previous_map.get(unit_tag, None) or self._enemy_structures_previous_map[unit_tag] | ||
|  |             print(f"Enemy unit left vision, last known location: {last_known_unit.position}") | ||
|  | 
 | ||
|  |         :param unit_tag: | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_before_start(self): | ||
|  |         """
 | ||
|  |         Override this in your bot class. This function is called before "on_start" | ||
|  |         and before "prepare_first_step" that calculates expansion locations. | ||
|  |         Not all data is available yet. | ||
|  |         This function is useful in realtime=True mode to split your workers or start producing the first worker. | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_start(self): | ||
|  |         """
 | ||
|  |         Override this in your bot class. | ||
|  |         At this point, game_data, game_info and the first iteration of game_state (self.state) are available. | ||
|  |         """
 | ||
|  | 
 | ||
|  |     async def on_step(self, iteration: int): | ||
|  |         """
 | ||
|  |         You need to implement this function! | ||
|  |         Override this in your bot class. | ||
|  |         This function is called on every game step (looped in realtime mode). | ||
|  | 
 | ||
|  |         :param iteration: | ||
|  |         """
 | ||
|  |         raise NotImplementedError | ||
|  | 
 | ||
|  |     async def on_end(self, game_result: Result): | ||
|  |         """Override this in your bot class. This function is called at the end of a game.
 | ||
|  |         Unsure if this function will be called on the laddermanager client as the bot process may forcefully be terminated. | ||
|  | 
 | ||
|  |         :param game_result:"""
 |