721 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			721 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| from __future__ import annotations
 | |
| 
 | |
| from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
 | |
| 
 | |
| from worlds._sc2common.bot import logger
 | |
| 
 | |
| from s2clientprotocol import debug_pb2 as debug_pb
 | |
| from s2clientprotocol import query_pb2 as query_pb
 | |
| from s2clientprotocol import raw_pb2 as raw_pb
 | |
| from s2clientprotocol import sc2api_pb2 as sc_pb
 | |
| from s2clientprotocol import spatial_pb2 as spatial_pb
 | |
| 
 | |
| from .data import ActionResult, ChatChannel, Race, Result, Status
 | |
| from .game_data import AbilityData, GameData
 | |
| from .game_info import GameInfo
 | |
| from .position import Point2, Point3
 | |
| from .protocol import ConnectionAlreadyClosed, Protocol, ProtocolError
 | |
| from .renderer import Renderer
 | |
| from .unit import Unit
 | |
| from .units import Units
 | |
| 
 | |
| 
 | |
| # pylint: disable=R0904
 | |
| class Client(Protocol):
 | |
| 
 | |
|     def __init__(self, ws, save_replay_path: str = None):
 | |
|         """
 | |
|         :param ws:
 | |
|         """
 | |
|         super().__init__(ws)
 | |
|         # How many frames will be waited between iterations before the next one is called
 | |
|         self.game_step: int = 4
 | |
|         self.save_replay_path: Optional[str] = save_replay_path
 | |
|         self._player_id = None
 | |
|         self._game_result = None
 | |
|         # Store a hash value of all the debug requests to prevent sending the same ones again if they haven't changed last frame
 | |
|         self._debug_hash_tuple_last_iteration: Tuple[int, int, int, int] = (0, 0, 0, 0)
 | |
|         self._debug_draw_last_frame = False
 | |
|         self._debug_texts = []
 | |
|         self._debug_lines = []
 | |
|         self._debug_boxes = []
 | |
|         self._debug_spheres = []
 | |
| 
 | |
|         self._renderer = None
 | |
|         self.raw_affects_selection = False
 | |
| 
 | |
|     @property
 | |
|     def in_game(self) -> bool:
 | |
|         return self._status in {Status.in_game, Status.in_replay}
 | |
| 
 | |
|     async def join_game(self, name=None, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None):
 | |
|         ifopts = sc_pb.InterfaceOptions(
 | |
|             raw=True,
 | |
|             score=True,
 | |
|             show_cloaked=True,
 | |
|             show_burrowed_shadows=True,
 | |
|             raw_affects_selection=self.raw_affects_selection,
 | |
|             raw_crop_to_playable_area=False,
 | |
|             show_placeholders=True,
 | |
|         )
 | |
| 
 | |
|         if rgb_render_config:
 | |
|             assert isinstance(rgb_render_config, dict)
 | |
|             assert "window_size" in rgb_render_config and "minimap_size" in rgb_render_config
 | |
|             window_size = rgb_render_config["window_size"]
 | |
|             minimap_size = rgb_render_config["minimap_size"]
 | |
|             self._renderer = Renderer(self, window_size, minimap_size)
 | |
|             map_width, map_height = window_size
 | |
|             minimap_width, minimap_height = minimap_size
 | |
| 
 | |
|             ifopts.render.resolution.x = map_width
 | |
|             ifopts.render.resolution.y = map_height
 | |
|             ifopts.render.minimap_resolution.x = minimap_width
 | |
|             ifopts.render.minimap_resolution.y = minimap_height
 | |
| 
 | |
|         if race is None:
 | |
|             assert isinstance(observed_player_id, int), f"observed_player_id is of type {type(observed_player_id)}"
 | |
|             # join as observer
 | |
|             req = sc_pb.RequestJoinGame(observed_player_id=observed_player_id, options=ifopts)
 | |
|         else:
 | |
|             assert isinstance(race, Race)
 | |
|             req = sc_pb.RequestJoinGame(race=race.value, options=ifopts)
 | |
| 
 | |
|         if portconfig:
 | |
|             req.server_ports.game_port = portconfig.server[0]
 | |
|             req.server_ports.base_port = portconfig.server[1]
 | |
| 
 | |
|             for ppc in portconfig.players:
 | |
|                 p = req.client_ports.add()
 | |
|                 p.game_port = ppc[0]
 | |
|                 p.base_port = ppc[1]
 | |
| 
 | |
|         if name is not None:
 | |
|             assert isinstance(name, str), f"name is of type {type(name)}"
 | |
|             req.player_name = name
 | |
| 
 | |
|         result = await self._execute(join_game=req)
 | |
|         self._game_result = None
 | |
|         self._player_id = result.join_game.player_id
 | |
|         return result.join_game.player_id
 | |
| 
 | |
|     async def leave(self):
 | |
|         """ You can use 'await self.client.leave()' to surrender midst game. """
 | |
|         is_resign = self._game_result is None
 | |
| 
 | |
|         if is_resign:
 | |
|             # For all clients that can leave, result of leaving the game either
 | |
|             # loss, or the client will ignore the result
 | |
|             self._game_result = {self._player_id: Result.Defeat}
 | |
| 
 | |
|         try:
 | |
|             if self.save_replay_path is not None:
 | |
|                 await self.save_replay(self.save_replay_path)
 | |
|                 self.save_replay_path = None
 | |
|             await self._execute(leave_game=sc_pb.RequestLeaveGame())
 | |
|         except (ProtocolError, ConnectionAlreadyClosed):
 | |
|             if is_resign:
 | |
|                 raise
 | |
| 
 | |
|     async def save_replay(self, path):
 | |
|         logger.debug("Requesting replay from server")
 | |
|         result = await self._execute(save_replay=sc_pb.RequestSaveReplay())
 | |
|         with open(path, "wb") as f:
 | |
|             f.write(result.save_replay.data)
 | |
|         logger.info(f"Saved replay to {path}")
 | |
| 
 | |
|     async def observation(self, game_loop: int = None):
 | |
|         if game_loop is not None:
 | |
|             result = await self._execute(observation=sc_pb.RequestObservation(game_loop=game_loop))
 | |
|         else:
 | |
|             result = await self._execute(observation=sc_pb.RequestObservation())
 | |
|         assert result.HasField("observation")
 | |
| 
 | |
|         if not self.in_game or result.observation.player_result:
 | |
|             # Sometimes game ends one step before results are available
 | |
|             if not result.observation.player_result:
 | |
|                 result = await self._execute(observation=sc_pb.RequestObservation())
 | |
|                 assert result.observation.player_result
 | |
| 
 | |
|             player_id_to_result = {}
 | |
|             for pr in result.observation.player_result:
 | |
|                 player_id_to_result[pr.player_id] = Result(pr.result)
 | |
|             self._game_result = player_id_to_result
 | |
|         self._game_result = None
 | |
| 
 | |
|         # if render_data is available, then RGB rendering was requested
 | |
|         if self._renderer and result.observation.observation.HasField("render_data"):
 | |
|             await self._renderer.render(result.observation)
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     async def step(self, step_size: int = None):
 | |
|         """ EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second """
 | |
|         step_size = step_size or self.game_step
 | |
|         return await self._execute(step=sc_pb.RequestStep(count=step_size))
 | |
| 
 | |
|     async def get_game_data(self) -> GameData:
 | |
|         result = await self._execute(
 | |
|             data=sc_pb.RequestData(ability_id=True, unit_type_id=True, upgrade_id=True, buff_id=True, effect_id=True)
 | |
|         )
 | |
|         return GameData(result.data)
 | |
| 
 | |
|     async def dump_data(self, ability_id=True, unit_type_id=True, upgrade_id=True, buff_id=True, effect_id=True):
 | |
|         """
 | |
|         Dump the game data files
 | |
|         choose what data to dump in the keywords
 | |
|         this function writes to a text file
 | |
|         call it one time in on_step with:
 | |
|         await self._client.dump_data()
 | |
|         """
 | |
|         result = await self._execute(
 | |
|             data=sc_pb.RequestData(
 | |
|                 ability_id=ability_id,
 | |
|                 unit_type_id=unit_type_id,
 | |
|                 upgrade_id=upgrade_id,
 | |
|                 buff_id=buff_id,
 | |
|                 effect_id=effect_id,
 | |
|             )
 | |
|         )
 | |
|         with open("data_dump.txt", "a") as file:
 | |
|             file.write(str(result.data))
 | |
| 
 | |
|     async def get_game_info(self) -> GameInfo:
 | |
|         result = await self._execute(game_info=sc_pb.RequestGameInfo())
 | |
|         return GameInfo(result.game_info)
 | |
| 
 | |
|     async def query_pathing(self, start: Union[Unit, Point2, Point3],
 | |
|                             end: Union[Point2, Point3]) -> Optional[Union[int, float]]:
 | |
|         """Caution: returns "None" when path not found
 | |
|         Try to combine queries with the function below because the pathing query is generally slow.
 | |
| 
 | |
|         :param start:
 | |
|         :param end:"""
 | |
|         assert isinstance(start, (Point2, Unit))
 | |
|         assert isinstance(end, Point2)
 | |
|         if isinstance(start, Point2):
 | |
|             path = [query_pb.RequestQueryPathing(start_pos=start.as_Point2D, end_pos=end.as_Point2D)]
 | |
|         else:
 | |
|             path = [query_pb.RequestQueryPathing(unit_tag=start.tag, end_pos=end.as_Point2D)]
 | |
|         result = await self._execute(query=query_pb.RequestQuery(pathing=path))
 | |
|         distance = float(result.query.pathing[0].distance)
 | |
|         if distance <= 0.0:
 | |
|             return None
 | |
|         return distance
 | |
| 
 | |
|     async def query_pathings(self, zipped_list: List[List[Union[Unit, Point2, Point3]]]) -> List[float]:
 | |
|         """Usage: await self.query_pathings([[unit1, target2], [unit2, target2]])
 | |
|         -> returns [distance1, distance2]
 | |
|         Caution: returns 0 when path not found
 | |
| 
 | |
|         :param zipped_list:
 | |
|         """
 | |
|         assert zipped_list, "No zipped_list"
 | |
|         assert isinstance(zipped_list, list), f"{type(zipped_list)}"
 | |
|         assert isinstance(zipped_list[0], list), f"{type(zipped_list[0])}"
 | |
|         assert len(zipped_list[0]) == 2, f"{len(zipped_list[0])}"
 | |
|         assert isinstance(zipped_list[0][0], (Point2, Unit)), f"{type(zipped_list[0][0])}"
 | |
|         assert isinstance(zipped_list[0][1], Point2), f"{type(zipped_list[0][1])}"
 | |
|         if isinstance(zipped_list[0][0], Point2):
 | |
|             path = (
 | |
|                 query_pb.RequestQueryPathing(start_pos=p1.as_Point2D, end_pos=p2.as_Point2D) for p1, p2 in zipped_list
 | |
|             )
 | |
|         else:
 | |
|             path = (query_pb.RequestQueryPathing(unit_tag=p1.tag, end_pos=p2.as_Point2D) for p1, p2 in zipped_list)
 | |
|         results = await self._execute(query=query_pb.RequestQuery(pathing=path))
 | |
|         return [float(d.distance) for d in results.query.pathing]
 | |
| 
 | |
|     async def query_building_placement(
 | |
|         self,
 | |
|         ability: AbilityData,
 | |
|         positions: List[Union[Point2, Point3]],
 | |
|         ignore_resources: bool = True
 | |
|     ) -> List[ActionResult]:
 | |
|         """This function might be deleted in favor of the function above (_query_building_placement_fast).
 | |
| 
 | |
|         :param ability:
 | |
|         :param positions:
 | |
|         :param ignore_resources:"""
 | |
|         assert isinstance(ability, AbilityData)
 | |
|         result = await self._execute(
 | |
|             query=query_pb.RequestQuery(
 | |
|                 placements=(
 | |
|                     query_pb.RequestQueryBuildingPlacement(ability_id=ability.id.value, target_pos=position.as_Point2D)
 | |
|                     for position in positions
 | |
|                 ),
 | |
|                 ignore_resource_requirements=ignore_resources,
 | |
|             )
 | |
|         )
 | |
|         # Unnecessary converting to ActionResult?
 | |
|         return [ActionResult(p.result) for p in result.query.placements]
 | |
| 
 | |
|     async def chat_send(self, message: str, team_only: bool):
 | |
|         """ Writes a message to the chat """
 | |
|         ch = ChatChannel.Team if team_only else ChatChannel.Broadcast
 | |
|         await self._execute(
 | |
|             action=sc_pb.RequestAction(
 | |
|                 actions=[sc_pb.Action(action_chat=sc_pb.ActionChat(channel=ch.value, message=message))]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     async def debug_kill_unit(self, unit_tags: Union[Unit, Units, List[int], Set[int]]):
 | |
|         """
 | |
|         :param unit_tags:
 | |
|         """
 | |
|         if isinstance(unit_tags, Units):
 | |
|             unit_tags = unit_tags.tags
 | |
|         if isinstance(unit_tags, Unit):
 | |
|             unit_tags = [unit_tags.tag]
 | |
|         assert unit_tags
 | |
| 
 | |
|         await self._execute(
 | |
|             debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit(tag=unit_tags))])
 | |
|         )
 | |
| 
 | |
|     async def move_camera(self, position: Union[Unit, Units, Point2, Point3]):
 | |
|         """Moves camera to the target position
 | |
| 
 | |
|         :param position:"""
 | |
|         assert isinstance(position, (Unit, Units, Point2, Point3))
 | |
|         if isinstance(position, Units):
 | |
|             position = position.center
 | |
|         if isinstance(position, Unit):
 | |
|             position = position.position
 | |
|         await self._execute(
 | |
|             action=sc_pb.RequestAction(
 | |
|                 actions=[
 | |
|                     sc_pb.Action(
 | |
|                         action_raw=raw_pb.ActionRaw(
 | |
|                             camera_move=raw_pb.ActionRawCameraMove(center_world_space=position.to3.as_Point)
 | |
|                         )
 | |
|                     )
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     async def obs_move_camera(self, position: Union[Unit, Units, Point2, Point3]):
 | |
|         """Moves observer camera to the target position. Only works when observing (e.g. watching the replay).
 | |
| 
 | |
|         :param position:"""
 | |
|         assert isinstance(position, (Unit, Units, Point2, Point3))
 | |
|         if isinstance(position, Units):
 | |
|             position = position.center
 | |
|         if isinstance(position, Unit):
 | |
|             position = position.position
 | |
|         await self._execute(
 | |
|             obs_action=sc_pb.RequestObserverAction(
 | |
|                 actions=[
 | |
|                     sc_pb.ObserverAction(camera_move=sc_pb.ActionObserverCameraMove(world_pos=position.as_Point2D))
 | |
|                 ]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     async def move_camera_spatial(self, position: Union[Point2, Point3]):
 | |
|         """Moves camera to the target position using the spatial aciton interface
 | |
| 
 | |
|         :param position:"""
 | |
|         assert isinstance(position, (Point2, Point3))
 | |
|         action = sc_pb.Action(
 | |
|             action_render=spatial_pb.ActionSpatial(
 | |
|                 camera_move=spatial_pb.ActionSpatialCameraMove(center_minimap=position.as_PointI)
 | |
|             )
 | |
|         )
 | |
|         await self._execute(action=sc_pb.RequestAction(actions=[action]))
 | |
| 
 | |
|     def debug_text_simple(self, text: str):
 | |
|         """ Draws a text in the top left corner of the screen (up to a max of 6 messages fit there). """
 | |
|         self._debug_texts.append(DrawItemScreenText(text=text, color=None, start_point=Point2((0, 0)), font_size=8))
 | |
| 
 | |
|     def debug_text_screen(
 | |
|         self,
 | |
|         text: str,
 | |
|         pos: Union[Point2, Point3, tuple, list],
 | |
|         color: Union[tuple, list, Point3] = None,
 | |
|         size: int = 8,
 | |
|     ):
 | |
|         """
 | |
|         Draws a text on the screen (monitor / game window) with coordinates 0 <= x, y <= 1.
 | |
| 
 | |
|         :param text:
 | |
|         :param pos:
 | |
|         :param color:
 | |
|         :param size:
 | |
|         """
 | |
|         assert len(pos) >= 2
 | |
|         assert 0 <= pos[0] <= 1
 | |
|         assert 0 <= pos[1] <= 1
 | |
|         pos = Point2((pos[0], pos[1]))
 | |
|         self._debug_texts.append(DrawItemScreenText(text=text, color=color, start_point=pos, font_size=size))
 | |
| 
 | |
|     def debug_text_2d(
 | |
|         self,
 | |
|         text: str,
 | |
|         pos: Union[Point2, Point3, tuple, list],
 | |
|         color: Union[tuple, list, Point3] = None,
 | |
|         size: int = 8,
 | |
|     ):
 | |
|         return self.debug_text_screen(text, pos, color, size)
 | |
| 
 | |
|     def debug_text_world(
 | |
|         self, text: str, pos: Union[Unit, Point3], color: Union[tuple, list, Point3] = None, size: int = 8
 | |
|     ):
 | |
|         """
 | |
|         Draws a text at Point3 position in the game world.
 | |
|         To grab a unit's 3d position, use unit.position3d
 | |
|         Usually the Z value of a Point3 is between 8 and 14 (except for flying units). Use self.get_terrain_z_height() from bot_ai.py to get the Z value (height) of the terrain at a 2D position.
 | |
| 
 | |
|         :param text:
 | |
|         :param color:
 | |
|         :param size:
 | |
|         """
 | |
|         if isinstance(pos, Unit):
 | |
|             pos = pos.position3d
 | |
|         assert isinstance(pos, Point3)
 | |
|         self._debug_texts.append(DrawItemWorldText(text=text, color=color, start_point=pos, font_size=size))
 | |
| 
 | |
|     def debug_text_3d(
 | |
|         self, text: str, pos: Union[Unit, Point3], color: Union[tuple, list, Point3] = None, size: int = 8
 | |
|     ):
 | |
|         return self.debug_text_world(text, pos, color, size)
 | |
| 
 | |
|     def debug_line_out(
 | |
|         self, p0: Union[Unit, Point3], p1: Union[Unit, Point3], color: Union[tuple, list, Point3] = None
 | |
|     ):
 | |
|         """
 | |
|         Draws a line from p0 to p1.
 | |
| 
 | |
|         :param p0:
 | |
|         :param p1:
 | |
|         :param color:
 | |
|         """
 | |
|         if isinstance(p0, Unit):
 | |
|             p0 = p0.position3d
 | |
|         assert isinstance(p0, Point3)
 | |
|         if isinstance(p1, Unit):
 | |
|             p1 = p1.position3d
 | |
|         assert isinstance(p1, Point3)
 | |
|         self._debug_lines.append(DrawItemLine(color=color, start_point=p0, end_point=p1))
 | |
| 
 | |
|     def debug_box_out(
 | |
|         self,
 | |
|         p_min: Union[Unit, Point3],
 | |
|         p_max: Union[Unit, Point3],
 | |
|         color: Union[tuple, list, Point3] = None,
 | |
|     ):
 | |
|         """
 | |
|         Draws a box with p_min and p_max as corners of the box.
 | |
| 
 | |
|         :param p_min:
 | |
|         :param p_max:
 | |
|         :param color:
 | |
|         """
 | |
|         if isinstance(p_min, Unit):
 | |
|             p_min = p_min.position3d
 | |
|         assert isinstance(p_min, Point3)
 | |
|         if isinstance(p_max, Unit):
 | |
|             p_max = p_max.position3d
 | |
|         assert isinstance(p_max, Point3)
 | |
|         self._debug_boxes.append(DrawItemBox(start_point=p_min, end_point=p_max, color=color))
 | |
| 
 | |
|     def debug_box2_out(
 | |
|         self,
 | |
|         pos: Union[Unit, Point3],
 | |
|         half_vertex_length: float = 0.25,
 | |
|         color: Union[tuple, list, Point3] = None,
 | |
|     ):
 | |
|         """
 | |
|         Draws a box center at a position 'pos', with box side lengths (vertices) of two times 'half_vertex_length'.
 | |
| 
 | |
|         :param pos:
 | |
|         :param half_vertex_length:
 | |
|         :param color:
 | |
|         """
 | |
|         if isinstance(pos, Unit):
 | |
|             pos = pos.position3d
 | |
|         assert isinstance(pos, Point3)
 | |
|         p0 = pos + Point3((-half_vertex_length, -half_vertex_length, -half_vertex_length))
 | |
|         p1 = pos + Point3((half_vertex_length, half_vertex_length, half_vertex_length))
 | |
|         self._debug_boxes.append(DrawItemBox(start_point=p0, end_point=p1, color=color))
 | |
| 
 | |
|     def debug_sphere_out(self, p: Union[Unit, Point3], r: float, color: Union[tuple, list, Point3] = None):
 | |
|         """
 | |
|         Draws a sphere at point p with radius r.
 | |
| 
 | |
|         :param p:
 | |
|         :param r:
 | |
|         :param color:
 | |
|         """
 | |
|         if isinstance(p, Unit):
 | |
|             p = p.position3d
 | |
|         assert isinstance(p, Point3)
 | |
|         self._debug_spheres.append(DrawItemSphere(start_point=p, radius=r, color=color))
 | |
| 
 | |
|     async def _send_debug(self):
 | |
|         """Sends the debug draw execution. This is run by main.py now automatically, if there is any items in the list. You do not need to run this manually any longer.
 | |
|         Check examples/terran/ramp_wall.py for example drawing. Each draw request needs to be sent again in every single on_step iteration.
 | |
|         """
 | |
|         debug_hash = (
 | |
|             sum(hash(item) for item in self._debug_texts),
 | |
|             sum(hash(item) for item in self._debug_lines),
 | |
|             sum(hash(item) for item in self._debug_boxes),
 | |
|             sum(hash(item) for item in self._debug_spheres),
 | |
|         )
 | |
|         if debug_hash != (0, 0, 0, 0):
 | |
|             if debug_hash != self._debug_hash_tuple_last_iteration:
 | |
|                 # Something has changed, either more or less is to be drawn, or a position of a drawing changed (e.g. when drawing on a moving unit)
 | |
|                 self._debug_hash_tuple_last_iteration = debug_hash
 | |
|                 try:
 | |
|                     await self._execute(
 | |
|                         debug=sc_pb.RequestDebug(
 | |
|                             debug=[
 | |
|                                 debug_pb.DebugCommand(
 | |
|                                     draw=debug_pb.DebugDraw(
 | |
|                                         text=[text.to_proto()
 | |
|                                               for text in self._debug_texts] if self._debug_texts else None,
 | |
|                                         lines=[line.to_proto()
 | |
|                                                for line in self._debug_lines] if self._debug_lines else None,
 | |
|                                         boxes=[box.to_proto()
 | |
|                                                for box in self._debug_boxes] if self._debug_boxes else None,
 | |
|                                         spheres=[sphere.to_proto()
 | |
|                                                  for sphere in self._debug_spheres] if self._debug_spheres else None,
 | |
|                                     )
 | |
|                                 )
 | |
|                             ]
 | |
|                         )
 | |
|                     )
 | |
|                 except ProtocolError:
 | |
|                     return
 | |
|             self._debug_draw_last_frame = True
 | |
|             self._debug_texts.clear()
 | |
|             self._debug_lines.clear()
 | |
|             self._debug_boxes.clear()
 | |
|             self._debug_spheres.clear()
 | |
|         elif self._debug_draw_last_frame:
 | |
|             # Clear drawing if we drew last frame but nothing to draw this frame
 | |
|             self._debug_hash_tuple_last_iteration = (0, 0, 0, 0)
 | |
|             await self._execute(
 | |
|                 debug=sc_pb.RequestDebug(
 | |
|                     debug=[
 | |
|                         debug_pb.DebugCommand(draw=debug_pb.DebugDraw(text=None, lines=None, boxes=None, spheres=None))
 | |
|                     ]
 | |
|                 )
 | |
|             )
 | |
|             self._debug_draw_last_frame = False
 | |
| 
 | |
|     async def debug_leave(self):
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(end_game=debug_pb.DebugEndGame())]))
 | |
| 
 | |
|     async def debug_set_unit_value(self, unit_tags: Union[Iterable[int], Units, Unit], unit_value: int, value: float):
 | |
|         """Sets a "unit value" (Energy, Life or Shields) of the given units to the given value.
 | |
|         Can't set the life of a unit to 0, use "debug_kill_unit" for that. Also can't set the life above the unit's maximum.
 | |
|         The following example sets the health of all your workers to 1:
 | |
|         await self.debug_set_unit_value(self.workers, 2, value=1)"""
 | |
|         if isinstance(unit_tags, Units):
 | |
|             unit_tags = unit_tags.tags
 | |
|         if isinstance(unit_tags, Unit):
 | |
|             unit_tags = [unit_tags.tag]
 | |
|         assert hasattr(
 | |
|             unit_tags, "__iter__"
 | |
|         ), f"unit_tags argument needs to be an iterable (list, dict, set, Units), given argument is {type(unit_tags).__name__}"
 | |
|         assert (
 | |
|             1 <= unit_value <= 3
 | |
|         ), f"unit_value needs to be between 1 and 3 (1 for energy, 2 for life, 3 for shields), given argument is {unit_value}"
 | |
|         assert all(tag > 0 for tag in unit_tags), f"Unit tags have invalid value: {unit_tags}"
 | |
|         assert isinstance(value, (int, float)), "Value needs to be of type int or float"
 | |
|         assert value >= 0, "Value can't be negative"
 | |
|         await self._execute(
 | |
|             debug=sc_pb.RequestDebug(
 | |
|                 debug=(
 | |
|                     debug_pb.DebugCommand(
 | |
|                         unit_value=debug_pb.
 | |
|                         DebugSetUnitValue(unit_value=unit_value, value=float(value), unit_tag=unit_tag)
 | |
|                     ) for unit_tag in unit_tags
 | |
|                 )
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     async def debug_hang(self, delay_in_seconds: float):
 | |
|         """ Freezes the SC2 client. Not recommended to be used. """
 | |
|         delay_in_ms = int(round(delay_in_seconds * 1000))
 | |
|         await self._execute(
 | |
|             debug=sc_pb.RequestDebug(
 | |
|                 debug=[debug_pb.DebugCommand(test_process=debug_pb.DebugTestProcess(test=1, delay_ms=delay_in_ms))]
 | |
|             )
 | |
|         )
 | |
| 
 | |
|     async def debug_show_map(self):
 | |
|         """ Reveals the whole map for the bot. Using it a second time disables it again. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=1)]))
 | |
| 
 | |
|     async def debug_control_enemy(self):
 | |
|         """ Allows control over enemy units and structures similar to team games control - does not allow the bot to spend the opponent's ressources. Using it a second time disables it again.  """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=2)]))
 | |
| 
 | |
|     async def debug_food(self):
 | |
|         """ Should disable food usage (does not seem to work?). Using it a second time disables it again.  """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=3)]))
 | |
| 
 | |
|     async def debug_free(self):
 | |
|         """ Units, structures and upgrades are free of mineral and gas cost. Using it a second time disables it again.  """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=4)]))
 | |
| 
 | |
|     async def debug_all_resources(self):
 | |
|         """ Gives 5000 minerals and 5000 vespene to the bot. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=5)]))
 | |
| 
 | |
|     async def debug_god(self):
 | |
|         """ Your units and structures no longer take any damage. Using it a second time disables it again. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=6)]))
 | |
| 
 | |
|     async def debug_minerals(self):
 | |
|         """ Gives 5000 minerals to the bot. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=7)]))
 | |
| 
 | |
|     async def debug_gas(self):
 | |
|         """ Gives 5000 vespene to the bot. This does not seem to be working. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=8)]))
 | |
| 
 | |
|     async def debug_cooldown(self):
 | |
|         """ Disables cooldowns of unit abilities for the bot. Using it a second time disables it again. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=9)]))
 | |
| 
 | |
|     async def debug_tech_tree(self):
 | |
|         """ Removes all tech requirements (e.g. can build a factory without having a barracks). Using it a second time disables it again. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=10)]))
 | |
| 
 | |
|     async def debug_upgrade(self):
 | |
|         """ Researches all currently available upgrades. E.g. using it once unlocks combat shield, stimpack and 1-1. Using it a second time unlocks 2-2 and all other upgrades stay researched. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=11)]))
 | |
| 
 | |
|     async def debug_fast_build(self):
 | |
|         """ Sets the build time of units and structures and upgrades to zero. Using it a second time disables it again. """
 | |
|         await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=12)]))
 | |
| 
 | |
|     async def quick_save(self):
 | |
|         """Saves the current game state to an in-memory bookmark.
 | |
|         See: https://github.com/Blizzard/s2client-proto/blob/eeaf5efaea2259d7b70247211dff98da0a2685a2/s2clientprotocol/sc2api.proto#L93"""
 | |
|         await self._execute(quick_save=sc_pb.RequestQuickSave())
 | |
| 
 | |
|     async def quick_load(self):
 | |
|         """Loads the game state from the previously stored in-memory bookmark.
 | |
|         Caution:
 | |
|             - The SC2 Client will crash if the game wasn't quicksaved
 | |
|             - The bot step iteration counter will not reset
 | |
|             - self.state.game_loop will be set to zero after the quickload, and self.time is dependant on it"""
 | |
|         await self._execute(quick_load=sc_pb.RequestQuickLoad())
 | |
| 
 | |
| 
 | |
| class DrawItem:
 | |
| 
 | |
|     @staticmethod
 | |
|     def to_debug_color(color: Union[tuple, Point3]):
 | |
|         """ Helper function for color conversion """
 | |
|         if color is None:
 | |
|             return debug_pb.Color(r=255, g=255, b=255)
 | |
|         # Need to check if not of type Point3 because Point3 inherits from tuple
 | |
|         if isinstance(color, (tuple, list)) and not isinstance(color, Point3) and len(color) == 3:
 | |
|             return debug_pb.Color(r=color[0], g=color[1], b=color[2])
 | |
|         # In case color is of type Point3
 | |
|         r = getattr(color, "r", getattr(color, "x", 255))
 | |
|         g = getattr(color, "g", getattr(color, "y", 255))
 | |
|         b = getattr(color, "b", getattr(color, "z", 255))
 | |
|         if max(r, g, b) <= 1:
 | |
|             r *= 255
 | |
|             g *= 255
 | |
|             b *= 255
 | |
| 
 | |
|         return debug_pb.Color(r=int(r), g=int(g), b=int(b))
 | |
| 
 | |
| 
 | |
| class DrawItemScreenText(DrawItem):
 | |
| 
 | |
|     def __init__(self, start_point: Point2 = None, color: Point3 = None, text: str = "", font_size: int = 8):
 | |
|         self._start_point: Point2 = start_point
 | |
|         self._color: Point3 = color
 | |
|         self._text: str = text
 | |
|         self._font_size: int = font_size
 | |
| 
 | |
|     def to_proto(self):
 | |
|         return debug_pb.DebugText(
 | |
|             color=self.to_debug_color(self._color),
 | |
|             text=self._text,
 | |
|             virtual_pos=self._start_point.to3.as_Point,
 | |
|             world_pos=None,
 | |
|             size=self._font_size,
 | |
|         )
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self._start_point, self._color, self._text, self._font_size))
 | |
| 
 | |
| 
 | |
| class DrawItemWorldText(DrawItem):
 | |
| 
 | |
|     def __init__(self, start_point: Point3 = None, color: Point3 = None, text: str = "", font_size: int = 8):
 | |
|         self._start_point: Point3 = start_point
 | |
|         self._color: Point3 = color
 | |
|         self._text: str = text
 | |
|         self._font_size: int = font_size
 | |
| 
 | |
|     def to_proto(self):
 | |
|         return debug_pb.DebugText(
 | |
|             color=self.to_debug_color(self._color),
 | |
|             text=self._text,
 | |
|             virtual_pos=None,
 | |
|             world_pos=self._start_point.as_Point,
 | |
|             size=self._font_size,
 | |
|         )
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self._start_point, self._text, self._font_size, self._color))
 | |
| 
 | |
| 
 | |
| class DrawItemLine(DrawItem):
 | |
| 
 | |
|     def __init__(self, start_point: Point3 = None, end_point: Point3 = None, color: Point3 = None):
 | |
|         self._start_point: Point3 = start_point
 | |
|         self._end_point: Point3 = end_point
 | |
|         self._color: Point3 = color
 | |
| 
 | |
|     def to_proto(self):
 | |
|         return debug_pb.DebugLine(
 | |
|             line=debug_pb.Line(p0=self._start_point.as_Point, p1=self._end_point.as_Point),
 | |
|             color=self.to_debug_color(self._color),
 | |
|         )
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self._start_point, self._end_point, self._color))
 | |
| 
 | |
| 
 | |
| class DrawItemBox(DrawItem):
 | |
| 
 | |
|     def __init__(self, start_point: Point3 = None, end_point: Point3 = None, color: Point3 = None):
 | |
|         self._start_point: Point3 = start_point
 | |
|         self._end_point: Point3 = end_point
 | |
|         self._color: Point3 = color
 | |
| 
 | |
|     def to_proto(self):
 | |
|         return debug_pb.DebugBox(
 | |
|             min=self._start_point.as_Point,
 | |
|             max=self._end_point.as_Point,
 | |
|             color=self.to_debug_color(self._color),
 | |
|         )
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self._start_point, self._end_point, self._color))
 | |
| 
 | |
| 
 | |
| class DrawItemSphere(DrawItem):
 | |
| 
 | |
|     def __init__(self, start_point: Point3 = None, radius: float = None, color: Point3 = None):
 | |
|         self._start_point: Point3 = start_point
 | |
|         self._radius: float = radius
 | |
|         self._color: Point3 = color
 | |
| 
 | |
|     def to_proto(self):
 | |
|         return debug_pb.DebugSphere(
 | |
|             p=self._start_point.as_Point, r=self._radius, color=self.to_debug_color(self._color)
 | |
|         )
 | |
| 
 | |
|     def __hash__(self):
 | |
|         return hash((self._start_point, self._radius, self._color))
 | 
