721 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			721 lines
		
	
	
		
			30 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from __future__ import annotations
 | 
						|
 | 
						|
from typing import Dict, Iterable, List, Optional, Set, Tuple, Union
 | 
						|
 | 
						|
from worlds._sc2common.bot import logger
 | 
						|
 | 
						|
from s2clientprotocol import debug_pb2 as debug_pb
 | 
						|
from s2clientprotocol import query_pb2 as query_pb
 | 
						|
from s2clientprotocol import raw_pb2 as raw_pb
 | 
						|
from s2clientprotocol import sc2api_pb2 as sc_pb
 | 
						|
from s2clientprotocol import spatial_pb2 as spatial_pb
 | 
						|
 | 
						|
from .data import ActionResult, ChatChannel, Race, Result, Status
 | 
						|
from .game_data import AbilityData, GameData
 | 
						|
from .game_info import GameInfo
 | 
						|
from .position import Point2, Point3
 | 
						|
from .protocol import ConnectionAlreadyClosed, Protocol, ProtocolError
 | 
						|
from .renderer import Renderer
 | 
						|
from .unit import Unit
 | 
						|
from .units import Units
 | 
						|
 | 
						|
 | 
						|
# pylint: disable=R0904
 | 
						|
class Client(Protocol):
 | 
						|
 | 
						|
    def __init__(self, ws, save_replay_path: str = None):
 | 
						|
        """
 | 
						|
        :param ws:
 | 
						|
        """
 | 
						|
        super().__init__(ws)
 | 
						|
        # How many frames will be waited between iterations before the next one is called
 | 
						|
        self.game_step: int = 4
 | 
						|
        self.save_replay_path: Optional[str] = save_replay_path
 | 
						|
        self._player_id = None
 | 
						|
        self._game_result = None
 | 
						|
        # Store a hash value of all the debug requests to prevent sending the same ones again if they haven't changed last frame
 | 
						|
        self._debug_hash_tuple_last_iteration: Tuple[int, int, int, int] = (0, 0, 0, 0)
 | 
						|
        self._debug_draw_last_frame = False
 | 
						|
        self._debug_texts = []
 | 
						|
        self._debug_lines = []
 | 
						|
        self._debug_boxes = []
 | 
						|
        self._debug_spheres = []
 | 
						|
 | 
						|
        self._renderer = None
 | 
						|
        self.raw_affects_selection = False
 | 
						|
 | 
						|
    @property
 | 
						|
    def in_game(self) -> bool:
 | 
						|
        return self._status in {Status.in_game, Status.in_replay}
 | 
						|
 | 
						|
    async def join_game(self, name=None, race=None, observed_player_id=None, portconfig=None, rgb_render_config=None):
 | 
						|
        ifopts = sc_pb.InterfaceOptions(
 | 
						|
            raw=True,
 | 
						|
            score=True,
 | 
						|
            show_cloaked=True,
 | 
						|
            show_burrowed_shadows=True,
 | 
						|
            raw_affects_selection=self.raw_affects_selection,
 | 
						|
            raw_crop_to_playable_area=False,
 | 
						|
            show_placeholders=True,
 | 
						|
        )
 | 
						|
 | 
						|
        if rgb_render_config:
 | 
						|
            assert isinstance(rgb_render_config, dict)
 | 
						|
            assert "window_size" in rgb_render_config and "minimap_size" in rgb_render_config
 | 
						|
            window_size = rgb_render_config["window_size"]
 | 
						|
            minimap_size = rgb_render_config["minimap_size"]
 | 
						|
            self._renderer = Renderer(self, window_size, minimap_size)
 | 
						|
            map_width, map_height = window_size
 | 
						|
            minimap_width, minimap_height = minimap_size
 | 
						|
 | 
						|
            ifopts.render.resolution.x = map_width
 | 
						|
            ifopts.render.resolution.y = map_height
 | 
						|
            ifopts.render.minimap_resolution.x = minimap_width
 | 
						|
            ifopts.render.minimap_resolution.y = minimap_height
 | 
						|
 | 
						|
        if race is None:
 | 
						|
            assert isinstance(observed_player_id, int), f"observed_player_id is of type {type(observed_player_id)}"
 | 
						|
            # join as observer
 | 
						|
            req = sc_pb.RequestJoinGame(observed_player_id=observed_player_id, options=ifopts)
 | 
						|
        else:
 | 
						|
            assert isinstance(race, Race)
 | 
						|
            req = sc_pb.RequestJoinGame(race=race.value, options=ifopts)
 | 
						|
 | 
						|
        if portconfig:
 | 
						|
            req.server_ports.game_port = portconfig.server[0]
 | 
						|
            req.server_ports.base_port = portconfig.server[1]
 | 
						|
 | 
						|
            for ppc in portconfig.players:
 | 
						|
                p = req.client_ports.add()
 | 
						|
                p.game_port = ppc[0]
 | 
						|
                p.base_port = ppc[1]
 | 
						|
 | 
						|
        if name is not None:
 | 
						|
            assert isinstance(name, str), f"name is of type {type(name)}"
 | 
						|
            req.player_name = name
 | 
						|
 | 
						|
        result = await self._execute(join_game=req)
 | 
						|
        self._game_result = None
 | 
						|
        self._player_id = result.join_game.player_id
 | 
						|
        return result.join_game.player_id
 | 
						|
 | 
						|
    async def leave(self):
 | 
						|
        """ You can use 'await self.client.leave()' to surrender midst game. """
 | 
						|
        is_resign = self._game_result is None
 | 
						|
 | 
						|
        if is_resign:
 | 
						|
            # For all clients that can leave, result of leaving the game either
 | 
						|
            # loss, or the client will ignore the result
 | 
						|
            self._game_result = {self._player_id: Result.Defeat}
 | 
						|
 | 
						|
        try:
 | 
						|
            if self.save_replay_path is not None:
 | 
						|
                await self.save_replay(self.save_replay_path)
 | 
						|
                self.save_replay_path = None
 | 
						|
            await self._execute(leave_game=sc_pb.RequestLeaveGame())
 | 
						|
        except (ProtocolError, ConnectionAlreadyClosed):
 | 
						|
            if is_resign:
 | 
						|
                raise
 | 
						|
 | 
						|
    async def save_replay(self, path):
 | 
						|
        logger.debug("Requesting replay from server")
 | 
						|
        result = await self._execute(save_replay=sc_pb.RequestSaveReplay())
 | 
						|
        with open(path, "wb") as f:
 | 
						|
            f.write(result.save_replay.data)
 | 
						|
        logger.info(f"Saved replay to {path}")
 | 
						|
 | 
						|
    async def observation(self, game_loop: int = None):
 | 
						|
        if game_loop is not None:
 | 
						|
            result = await self._execute(observation=sc_pb.RequestObservation(game_loop=game_loop))
 | 
						|
        else:
 | 
						|
            result = await self._execute(observation=sc_pb.RequestObservation())
 | 
						|
        assert result.HasField("observation")
 | 
						|
 | 
						|
        if not self.in_game or result.observation.player_result:
 | 
						|
            # Sometimes game ends one step before results are available
 | 
						|
            if not result.observation.player_result:
 | 
						|
                result = await self._execute(observation=sc_pb.RequestObservation())
 | 
						|
                assert result.observation.player_result
 | 
						|
 | 
						|
            player_id_to_result = {}
 | 
						|
            for pr in result.observation.player_result:
 | 
						|
                player_id_to_result[pr.player_id] = Result(pr.result)
 | 
						|
            self._game_result = player_id_to_result
 | 
						|
        self._game_result = None
 | 
						|
 | 
						|
        # if render_data is available, then RGB rendering was requested
 | 
						|
        if self._renderer and result.observation.observation.HasField("render_data"):
 | 
						|
            await self._renderer.render(result.observation)
 | 
						|
 | 
						|
        return result
 | 
						|
 | 
						|
    async def step(self, step_size: int = None):
 | 
						|
        """ EXPERIMENTAL: Change self._client.game_step during the step function to increase or decrease steps per second """
 | 
						|
        step_size = step_size or self.game_step
 | 
						|
        return await self._execute(step=sc_pb.RequestStep(count=step_size))
 | 
						|
 | 
						|
    async def get_game_data(self) -> GameData:
 | 
						|
        result = await self._execute(
 | 
						|
            data=sc_pb.RequestData(ability_id=True, unit_type_id=True, upgrade_id=True, buff_id=True, effect_id=True)
 | 
						|
        )
 | 
						|
        return GameData(result.data)
 | 
						|
 | 
						|
    async def dump_data(self, ability_id=True, unit_type_id=True, upgrade_id=True, buff_id=True, effect_id=True):
 | 
						|
        """
 | 
						|
        Dump the game data files
 | 
						|
        choose what data to dump in the keywords
 | 
						|
        this function writes to a text file
 | 
						|
        call it one time in on_step with:
 | 
						|
        await self._client.dump_data()
 | 
						|
        """
 | 
						|
        result = await self._execute(
 | 
						|
            data=sc_pb.RequestData(
 | 
						|
                ability_id=ability_id,
 | 
						|
                unit_type_id=unit_type_id,
 | 
						|
                upgrade_id=upgrade_id,
 | 
						|
                buff_id=buff_id,
 | 
						|
                effect_id=effect_id,
 | 
						|
            )
 | 
						|
        )
 | 
						|
        with open("data_dump.txt", "a") as file:
 | 
						|
            file.write(str(result.data))
 | 
						|
 | 
						|
    async def get_game_info(self) -> GameInfo:
 | 
						|
        result = await self._execute(game_info=sc_pb.RequestGameInfo())
 | 
						|
        return GameInfo(result.game_info)
 | 
						|
 | 
						|
    async def query_pathing(self, start: Union[Unit, Point2, Point3],
 | 
						|
                            end: Union[Point2, Point3]) -> Optional[Union[int, float]]:
 | 
						|
        """Caution: returns "None" when path not found
 | 
						|
        Try to combine queries with the function below because the pathing query is generally slow.
 | 
						|
 | 
						|
        :param start:
 | 
						|
        :param end:"""
 | 
						|
        assert isinstance(start, (Point2, Unit))
 | 
						|
        assert isinstance(end, Point2)
 | 
						|
        if isinstance(start, Point2):
 | 
						|
            path = [query_pb.RequestQueryPathing(start_pos=start.as_Point2D, end_pos=end.as_Point2D)]
 | 
						|
        else:
 | 
						|
            path = [query_pb.RequestQueryPathing(unit_tag=start.tag, end_pos=end.as_Point2D)]
 | 
						|
        result = await self._execute(query=query_pb.RequestQuery(pathing=path))
 | 
						|
        distance = float(result.query.pathing[0].distance)
 | 
						|
        if distance <= 0.0:
 | 
						|
            return None
 | 
						|
        return distance
 | 
						|
 | 
						|
    async def query_pathings(self, zipped_list: List[List[Union[Unit, Point2, Point3]]]) -> List[float]:
 | 
						|
        """Usage: await self.query_pathings([[unit1, target2], [unit2, target2]])
 | 
						|
        -> returns [distance1, distance2]
 | 
						|
        Caution: returns 0 when path not found
 | 
						|
 | 
						|
        :param zipped_list:
 | 
						|
        """
 | 
						|
        assert zipped_list, "No zipped_list"
 | 
						|
        assert isinstance(zipped_list, list), f"{type(zipped_list)}"
 | 
						|
        assert isinstance(zipped_list[0], list), f"{type(zipped_list[0])}"
 | 
						|
        assert len(zipped_list[0]) == 2, f"{len(zipped_list[0])}"
 | 
						|
        assert isinstance(zipped_list[0][0], (Point2, Unit)), f"{type(zipped_list[0][0])}"
 | 
						|
        assert isinstance(zipped_list[0][1], Point2), f"{type(zipped_list[0][1])}"
 | 
						|
        if isinstance(zipped_list[0][0], Point2):
 | 
						|
            path = (
 | 
						|
                query_pb.RequestQueryPathing(start_pos=p1.as_Point2D, end_pos=p2.as_Point2D) for p1, p2 in zipped_list
 | 
						|
            )
 | 
						|
        else:
 | 
						|
            path = (query_pb.RequestQueryPathing(unit_tag=p1.tag, end_pos=p2.as_Point2D) for p1, p2 in zipped_list)
 | 
						|
        results = await self._execute(query=query_pb.RequestQuery(pathing=path))
 | 
						|
        return [float(d.distance) for d in results.query.pathing]
 | 
						|
 | 
						|
    async def query_building_placement(
 | 
						|
        self,
 | 
						|
        ability: AbilityData,
 | 
						|
        positions: List[Union[Point2, Point3]],
 | 
						|
        ignore_resources: bool = True
 | 
						|
    ) -> List[ActionResult]:
 | 
						|
        """This function might be deleted in favor of the function above (_query_building_placement_fast).
 | 
						|
 | 
						|
        :param ability:
 | 
						|
        :param positions:
 | 
						|
        :param ignore_resources:"""
 | 
						|
        assert isinstance(ability, AbilityData)
 | 
						|
        result = await self._execute(
 | 
						|
            query=query_pb.RequestQuery(
 | 
						|
                placements=(
 | 
						|
                    query_pb.RequestQueryBuildingPlacement(ability_id=ability.id.value, target_pos=position.as_Point2D)
 | 
						|
                    for position in positions
 | 
						|
                ),
 | 
						|
                ignore_resource_requirements=ignore_resources,
 | 
						|
            )
 | 
						|
        )
 | 
						|
        # Unnecessary converting to ActionResult?
 | 
						|
        return [ActionResult(p.result) for p in result.query.placements]
 | 
						|
 | 
						|
    async def chat_send(self, message: str, team_only: bool):
 | 
						|
        """ Writes a message to the chat """
 | 
						|
        ch = ChatChannel.Team if team_only else ChatChannel.Broadcast
 | 
						|
        await self._execute(
 | 
						|
            action=sc_pb.RequestAction(
 | 
						|
                actions=[sc_pb.Action(action_chat=sc_pb.ActionChat(channel=ch.value, message=message))]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    async def debug_kill_unit(self, unit_tags: Union[Unit, Units, List[int], Set[int]]):
 | 
						|
        """
 | 
						|
        :param unit_tags:
 | 
						|
        """
 | 
						|
        if isinstance(unit_tags, Units):
 | 
						|
            unit_tags = unit_tags.tags
 | 
						|
        if isinstance(unit_tags, Unit):
 | 
						|
            unit_tags = [unit_tags.tag]
 | 
						|
        assert unit_tags
 | 
						|
 | 
						|
        await self._execute(
 | 
						|
            debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(kill_unit=debug_pb.DebugKillUnit(tag=unit_tags))])
 | 
						|
        )
 | 
						|
 | 
						|
    async def move_camera(self, position: Union[Unit, Units, Point2, Point3]):
 | 
						|
        """Moves camera to the target position
 | 
						|
 | 
						|
        :param position:"""
 | 
						|
        assert isinstance(position, (Unit, Units, Point2, Point3))
 | 
						|
        if isinstance(position, Units):
 | 
						|
            position = position.center
 | 
						|
        if isinstance(position, Unit):
 | 
						|
            position = position.position
 | 
						|
        await self._execute(
 | 
						|
            action=sc_pb.RequestAction(
 | 
						|
                actions=[
 | 
						|
                    sc_pb.Action(
 | 
						|
                        action_raw=raw_pb.ActionRaw(
 | 
						|
                            camera_move=raw_pb.ActionRawCameraMove(center_world_space=position.to3.as_Point)
 | 
						|
                        )
 | 
						|
                    )
 | 
						|
                ]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    async def obs_move_camera(self, position: Union[Unit, Units, Point2, Point3]):
 | 
						|
        """Moves observer camera to the target position. Only works when observing (e.g. watching the replay).
 | 
						|
 | 
						|
        :param position:"""
 | 
						|
        assert isinstance(position, (Unit, Units, Point2, Point3))
 | 
						|
        if isinstance(position, Units):
 | 
						|
            position = position.center
 | 
						|
        if isinstance(position, Unit):
 | 
						|
            position = position.position
 | 
						|
        await self._execute(
 | 
						|
            obs_action=sc_pb.RequestObserverAction(
 | 
						|
                actions=[
 | 
						|
                    sc_pb.ObserverAction(camera_move=sc_pb.ActionObserverCameraMove(world_pos=position.as_Point2D))
 | 
						|
                ]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    async def move_camera_spatial(self, position: Union[Point2, Point3]):
 | 
						|
        """Moves camera to the target position using the spatial aciton interface
 | 
						|
 | 
						|
        :param position:"""
 | 
						|
        assert isinstance(position, (Point2, Point3))
 | 
						|
        action = sc_pb.Action(
 | 
						|
            action_render=spatial_pb.ActionSpatial(
 | 
						|
                camera_move=spatial_pb.ActionSpatialCameraMove(center_minimap=position.as_PointI)
 | 
						|
            )
 | 
						|
        )
 | 
						|
        await self._execute(action=sc_pb.RequestAction(actions=[action]))
 | 
						|
 | 
						|
    def debug_text_simple(self, text: str):
 | 
						|
        """ Draws a text in the top left corner of the screen (up to a max of 6 messages fit there). """
 | 
						|
        self._debug_texts.append(DrawItemScreenText(text=text, color=None, start_point=Point2((0, 0)), font_size=8))
 | 
						|
 | 
						|
    def debug_text_screen(
 | 
						|
        self,
 | 
						|
        text: str,
 | 
						|
        pos: Union[Point2, Point3, tuple, list],
 | 
						|
        color: Union[tuple, list, Point3] = None,
 | 
						|
        size: int = 8,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Draws a text on the screen (monitor / game window) with coordinates 0 <= x, y <= 1.
 | 
						|
 | 
						|
        :param text:
 | 
						|
        :param pos:
 | 
						|
        :param color:
 | 
						|
        :param size:
 | 
						|
        """
 | 
						|
        assert len(pos) >= 2
 | 
						|
        assert 0 <= pos[0] <= 1
 | 
						|
        assert 0 <= pos[1] <= 1
 | 
						|
        pos = Point2((pos[0], pos[1]))
 | 
						|
        self._debug_texts.append(DrawItemScreenText(text=text, color=color, start_point=pos, font_size=size))
 | 
						|
 | 
						|
    def debug_text_2d(
 | 
						|
        self,
 | 
						|
        text: str,
 | 
						|
        pos: Union[Point2, Point3, tuple, list],
 | 
						|
        color: Union[tuple, list, Point3] = None,
 | 
						|
        size: int = 8,
 | 
						|
    ):
 | 
						|
        return self.debug_text_screen(text, pos, color, size)
 | 
						|
 | 
						|
    def debug_text_world(
 | 
						|
        self, text: str, pos: Union[Unit, Point3], color: Union[tuple, list, Point3] = None, size: int = 8
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Draws a text at Point3 position in the game world.
 | 
						|
        To grab a unit's 3d position, use unit.position3d
 | 
						|
        Usually the Z value of a Point3 is between 8 and 14 (except for flying units). Use self.get_terrain_z_height() from bot_ai.py to get the Z value (height) of the terrain at a 2D position.
 | 
						|
 | 
						|
        :param text:
 | 
						|
        :param color:
 | 
						|
        :param size:
 | 
						|
        """
 | 
						|
        if isinstance(pos, Unit):
 | 
						|
            pos = pos.position3d
 | 
						|
        assert isinstance(pos, Point3)
 | 
						|
        self._debug_texts.append(DrawItemWorldText(text=text, color=color, start_point=pos, font_size=size))
 | 
						|
 | 
						|
    def debug_text_3d(
 | 
						|
        self, text: str, pos: Union[Unit, Point3], color: Union[tuple, list, Point3] = None, size: int = 8
 | 
						|
    ):
 | 
						|
        return self.debug_text_world(text, pos, color, size)
 | 
						|
 | 
						|
    def debug_line_out(
 | 
						|
        self, p0: Union[Unit, Point3], p1: Union[Unit, Point3], color: Union[tuple, list, Point3] = None
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Draws a line from p0 to p1.
 | 
						|
 | 
						|
        :param p0:
 | 
						|
        :param p1:
 | 
						|
        :param color:
 | 
						|
        """
 | 
						|
        if isinstance(p0, Unit):
 | 
						|
            p0 = p0.position3d
 | 
						|
        assert isinstance(p0, Point3)
 | 
						|
        if isinstance(p1, Unit):
 | 
						|
            p1 = p1.position3d
 | 
						|
        assert isinstance(p1, Point3)
 | 
						|
        self._debug_lines.append(DrawItemLine(color=color, start_point=p0, end_point=p1))
 | 
						|
 | 
						|
    def debug_box_out(
 | 
						|
        self,
 | 
						|
        p_min: Union[Unit, Point3],
 | 
						|
        p_max: Union[Unit, Point3],
 | 
						|
        color: Union[tuple, list, Point3] = None,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Draws a box with p_min and p_max as corners of the box.
 | 
						|
 | 
						|
        :param p_min:
 | 
						|
        :param p_max:
 | 
						|
        :param color:
 | 
						|
        """
 | 
						|
        if isinstance(p_min, Unit):
 | 
						|
            p_min = p_min.position3d
 | 
						|
        assert isinstance(p_min, Point3)
 | 
						|
        if isinstance(p_max, Unit):
 | 
						|
            p_max = p_max.position3d
 | 
						|
        assert isinstance(p_max, Point3)
 | 
						|
        self._debug_boxes.append(DrawItemBox(start_point=p_min, end_point=p_max, color=color))
 | 
						|
 | 
						|
    def debug_box2_out(
 | 
						|
        self,
 | 
						|
        pos: Union[Unit, Point3],
 | 
						|
        half_vertex_length: float = 0.25,
 | 
						|
        color: Union[tuple, list, Point3] = None,
 | 
						|
    ):
 | 
						|
        """
 | 
						|
        Draws a box center at a position 'pos', with box side lengths (vertices) of two times 'half_vertex_length'.
 | 
						|
 | 
						|
        :param pos:
 | 
						|
        :param half_vertex_length:
 | 
						|
        :param color:
 | 
						|
        """
 | 
						|
        if isinstance(pos, Unit):
 | 
						|
            pos = pos.position3d
 | 
						|
        assert isinstance(pos, Point3)
 | 
						|
        p0 = pos + Point3((-half_vertex_length, -half_vertex_length, -half_vertex_length))
 | 
						|
        p1 = pos + Point3((half_vertex_length, half_vertex_length, half_vertex_length))
 | 
						|
        self._debug_boxes.append(DrawItemBox(start_point=p0, end_point=p1, color=color))
 | 
						|
 | 
						|
    def debug_sphere_out(self, p: Union[Unit, Point3], r: float, color: Union[tuple, list, Point3] = None):
 | 
						|
        """
 | 
						|
        Draws a sphere at point p with radius r.
 | 
						|
 | 
						|
        :param p:
 | 
						|
        :param r:
 | 
						|
        :param color:
 | 
						|
        """
 | 
						|
        if isinstance(p, Unit):
 | 
						|
            p = p.position3d
 | 
						|
        assert isinstance(p, Point3)
 | 
						|
        self._debug_spheres.append(DrawItemSphere(start_point=p, radius=r, color=color))
 | 
						|
 | 
						|
    async def _send_debug(self):
 | 
						|
        """Sends the debug draw execution. This is run by main.py now automatically, if there is any items in the list. You do not need to run this manually any longer.
 | 
						|
        Check examples/terran/ramp_wall.py for example drawing. Each draw request needs to be sent again in every single on_step iteration.
 | 
						|
        """
 | 
						|
        debug_hash = (
 | 
						|
            sum(hash(item) for item in self._debug_texts),
 | 
						|
            sum(hash(item) for item in self._debug_lines),
 | 
						|
            sum(hash(item) for item in self._debug_boxes),
 | 
						|
            sum(hash(item) for item in self._debug_spheres),
 | 
						|
        )
 | 
						|
        if debug_hash != (0, 0, 0, 0):
 | 
						|
            if debug_hash != self._debug_hash_tuple_last_iteration:
 | 
						|
                # Something has changed, either more or less is to be drawn, or a position of a drawing changed (e.g. when drawing on a moving unit)
 | 
						|
                self._debug_hash_tuple_last_iteration = debug_hash
 | 
						|
                try:
 | 
						|
                    await self._execute(
 | 
						|
                        debug=sc_pb.RequestDebug(
 | 
						|
                            debug=[
 | 
						|
                                debug_pb.DebugCommand(
 | 
						|
                                    draw=debug_pb.DebugDraw(
 | 
						|
                                        text=[text.to_proto()
 | 
						|
                                              for text in self._debug_texts] if self._debug_texts else None,
 | 
						|
                                        lines=[line.to_proto()
 | 
						|
                                               for line in self._debug_lines] if self._debug_lines else None,
 | 
						|
                                        boxes=[box.to_proto()
 | 
						|
                                               for box in self._debug_boxes] if self._debug_boxes else None,
 | 
						|
                                        spheres=[sphere.to_proto()
 | 
						|
                                                 for sphere in self._debug_spheres] if self._debug_spheres else None,
 | 
						|
                                    )
 | 
						|
                                )
 | 
						|
                            ]
 | 
						|
                        )
 | 
						|
                    )
 | 
						|
                except ProtocolError:
 | 
						|
                    return
 | 
						|
            self._debug_draw_last_frame = True
 | 
						|
            self._debug_texts.clear()
 | 
						|
            self._debug_lines.clear()
 | 
						|
            self._debug_boxes.clear()
 | 
						|
            self._debug_spheres.clear()
 | 
						|
        elif self._debug_draw_last_frame:
 | 
						|
            # Clear drawing if we drew last frame but nothing to draw this frame
 | 
						|
            self._debug_hash_tuple_last_iteration = (0, 0, 0, 0)
 | 
						|
            await self._execute(
 | 
						|
                debug=sc_pb.RequestDebug(
 | 
						|
                    debug=[
 | 
						|
                        debug_pb.DebugCommand(draw=debug_pb.DebugDraw(text=None, lines=None, boxes=None, spheres=None))
 | 
						|
                    ]
 | 
						|
                )
 | 
						|
            )
 | 
						|
            self._debug_draw_last_frame = False
 | 
						|
 | 
						|
    async def debug_leave(self):
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(end_game=debug_pb.DebugEndGame())]))
 | 
						|
 | 
						|
    async def debug_set_unit_value(self, unit_tags: Union[Iterable[int], Units, Unit], unit_value: int, value: float):
 | 
						|
        """Sets a "unit value" (Energy, Life or Shields) of the given units to the given value.
 | 
						|
        Can't set the life of a unit to 0, use "debug_kill_unit" for that. Also can't set the life above the unit's maximum.
 | 
						|
        The following example sets the health of all your workers to 1:
 | 
						|
        await self.debug_set_unit_value(self.workers, 2, value=1)"""
 | 
						|
        if isinstance(unit_tags, Units):
 | 
						|
            unit_tags = unit_tags.tags
 | 
						|
        if isinstance(unit_tags, Unit):
 | 
						|
            unit_tags = [unit_tags.tag]
 | 
						|
        assert hasattr(
 | 
						|
            unit_tags, "__iter__"
 | 
						|
        ), f"unit_tags argument needs to be an iterable (list, dict, set, Units), given argument is {type(unit_tags).__name__}"
 | 
						|
        assert (
 | 
						|
            1 <= unit_value <= 3
 | 
						|
        ), f"unit_value needs to be between 1 and 3 (1 for energy, 2 for life, 3 for shields), given argument is {unit_value}"
 | 
						|
        assert all(tag > 0 for tag in unit_tags), f"Unit tags have invalid value: {unit_tags}"
 | 
						|
        assert isinstance(value, (int, float)), "Value needs to be of type int or float"
 | 
						|
        assert value >= 0, "Value can't be negative"
 | 
						|
        await self._execute(
 | 
						|
            debug=sc_pb.RequestDebug(
 | 
						|
                debug=(
 | 
						|
                    debug_pb.DebugCommand(
 | 
						|
                        unit_value=debug_pb.
 | 
						|
                        DebugSetUnitValue(unit_value=unit_value, value=float(value), unit_tag=unit_tag)
 | 
						|
                    ) for unit_tag in unit_tags
 | 
						|
                )
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    async def debug_hang(self, delay_in_seconds: float):
 | 
						|
        """ Freezes the SC2 client. Not recommended to be used. """
 | 
						|
        delay_in_ms = int(round(delay_in_seconds * 1000))
 | 
						|
        await self._execute(
 | 
						|
            debug=sc_pb.RequestDebug(
 | 
						|
                debug=[debug_pb.DebugCommand(test_process=debug_pb.DebugTestProcess(test=1, delay_ms=delay_in_ms))]
 | 
						|
            )
 | 
						|
        )
 | 
						|
 | 
						|
    async def debug_show_map(self):
 | 
						|
        """ Reveals the whole map for the bot. Using it a second time disables it again. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=1)]))
 | 
						|
 | 
						|
    async def debug_control_enemy(self):
 | 
						|
        """ Allows control over enemy units and structures similar to team games control - does not allow the bot to spend the opponent's ressources. Using it a second time disables it again.  """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=2)]))
 | 
						|
 | 
						|
    async def debug_food(self):
 | 
						|
        """ Should disable food usage (does not seem to work?). Using it a second time disables it again.  """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=3)]))
 | 
						|
 | 
						|
    async def debug_free(self):
 | 
						|
        """ Units, structures and upgrades are free of mineral and gas cost. Using it a second time disables it again.  """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=4)]))
 | 
						|
 | 
						|
    async def debug_all_resources(self):
 | 
						|
        """ Gives 5000 minerals and 5000 vespene to the bot. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=5)]))
 | 
						|
 | 
						|
    async def debug_god(self):
 | 
						|
        """ Your units and structures no longer take any damage. Using it a second time disables it again. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=6)]))
 | 
						|
 | 
						|
    async def debug_minerals(self):
 | 
						|
        """ Gives 5000 minerals to the bot. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=7)]))
 | 
						|
 | 
						|
    async def debug_gas(self):
 | 
						|
        """ Gives 5000 vespene to the bot. This does not seem to be working. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=8)]))
 | 
						|
 | 
						|
    async def debug_cooldown(self):
 | 
						|
        """ Disables cooldowns of unit abilities for the bot. Using it a second time disables it again. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=9)]))
 | 
						|
 | 
						|
    async def debug_tech_tree(self):
 | 
						|
        """ Removes all tech requirements (e.g. can build a factory without having a barracks). Using it a second time disables it again. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=10)]))
 | 
						|
 | 
						|
    async def debug_upgrade(self):
 | 
						|
        """ Researches all currently available upgrades. E.g. using it once unlocks combat shield, stimpack and 1-1. Using it a second time unlocks 2-2 and all other upgrades stay researched. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=11)]))
 | 
						|
 | 
						|
    async def debug_fast_build(self):
 | 
						|
        """ Sets the build time of units and structures and upgrades to zero. Using it a second time disables it again. """
 | 
						|
        await self._execute(debug=sc_pb.RequestDebug(debug=[debug_pb.DebugCommand(game_state=12)]))
 | 
						|
 | 
						|
    async def quick_save(self):
 | 
						|
        """Saves the current game state to an in-memory bookmark.
 | 
						|
        See: https://github.com/Blizzard/s2client-proto/blob/eeaf5efaea2259d7b70247211dff98da0a2685a2/s2clientprotocol/sc2api.proto#L93"""
 | 
						|
        await self._execute(quick_save=sc_pb.RequestQuickSave())
 | 
						|
 | 
						|
    async def quick_load(self):
 | 
						|
        """Loads the game state from the previously stored in-memory bookmark.
 | 
						|
        Caution:
 | 
						|
            - The SC2 Client will crash if the game wasn't quicksaved
 | 
						|
            - The bot step iteration counter will not reset
 | 
						|
            - self.state.game_loop will be set to zero after the quickload, and self.time is dependant on it"""
 | 
						|
        await self._execute(quick_load=sc_pb.RequestQuickLoad())
 | 
						|
 | 
						|
 | 
						|
class DrawItem:
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def to_debug_color(color: Union[tuple, Point3]):
 | 
						|
        """ Helper function for color conversion """
 | 
						|
        if color is None:
 | 
						|
            return debug_pb.Color(r=255, g=255, b=255)
 | 
						|
        # Need to check if not of type Point3 because Point3 inherits from tuple
 | 
						|
        if isinstance(color, (tuple, list)) and not isinstance(color, Point3) and len(color) == 3:
 | 
						|
            return debug_pb.Color(r=color[0], g=color[1], b=color[2])
 | 
						|
        # In case color is of type Point3
 | 
						|
        r = getattr(color, "r", getattr(color, "x", 255))
 | 
						|
        g = getattr(color, "g", getattr(color, "y", 255))
 | 
						|
        b = getattr(color, "b", getattr(color, "z", 255))
 | 
						|
        if max(r, g, b) <= 1:
 | 
						|
            r *= 255
 | 
						|
            g *= 255
 | 
						|
            b *= 255
 | 
						|
 | 
						|
        return debug_pb.Color(r=int(r), g=int(g), b=int(b))
 | 
						|
 | 
						|
 | 
						|
class DrawItemScreenText(DrawItem):
 | 
						|
 | 
						|
    def __init__(self, start_point: Point2 = None, color: Point3 = None, text: str = "", font_size: int = 8):
 | 
						|
        self._start_point: Point2 = start_point
 | 
						|
        self._color: Point3 = color
 | 
						|
        self._text: str = text
 | 
						|
        self._font_size: int = font_size
 | 
						|
 | 
						|
    def to_proto(self):
 | 
						|
        return debug_pb.DebugText(
 | 
						|
            color=self.to_debug_color(self._color),
 | 
						|
            text=self._text,
 | 
						|
            virtual_pos=self._start_point.to3.as_Point,
 | 
						|
            world_pos=None,
 | 
						|
            size=self._font_size,
 | 
						|
        )
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash((self._start_point, self._color, self._text, self._font_size))
 | 
						|
 | 
						|
 | 
						|
class DrawItemWorldText(DrawItem):
 | 
						|
 | 
						|
    def __init__(self, start_point: Point3 = None, color: Point3 = None, text: str = "", font_size: int = 8):
 | 
						|
        self._start_point: Point3 = start_point
 | 
						|
        self._color: Point3 = color
 | 
						|
        self._text: str = text
 | 
						|
        self._font_size: int = font_size
 | 
						|
 | 
						|
    def to_proto(self):
 | 
						|
        return debug_pb.DebugText(
 | 
						|
            color=self.to_debug_color(self._color),
 | 
						|
            text=self._text,
 | 
						|
            virtual_pos=None,
 | 
						|
            world_pos=self._start_point.as_Point,
 | 
						|
            size=self._font_size,
 | 
						|
        )
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash((self._start_point, self._text, self._font_size, self._color))
 | 
						|
 | 
						|
 | 
						|
class DrawItemLine(DrawItem):
 | 
						|
 | 
						|
    def __init__(self, start_point: Point3 = None, end_point: Point3 = None, color: Point3 = None):
 | 
						|
        self._start_point: Point3 = start_point
 | 
						|
        self._end_point: Point3 = end_point
 | 
						|
        self._color: Point3 = color
 | 
						|
 | 
						|
    def to_proto(self):
 | 
						|
        return debug_pb.DebugLine(
 | 
						|
            line=debug_pb.Line(p0=self._start_point.as_Point, p1=self._end_point.as_Point),
 | 
						|
            color=self.to_debug_color(self._color),
 | 
						|
        )
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash((self._start_point, self._end_point, self._color))
 | 
						|
 | 
						|
 | 
						|
class DrawItemBox(DrawItem):
 | 
						|
 | 
						|
    def __init__(self, start_point: Point3 = None, end_point: Point3 = None, color: Point3 = None):
 | 
						|
        self._start_point: Point3 = start_point
 | 
						|
        self._end_point: Point3 = end_point
 | 
						|
        self._color: Point3 = color
 | 
						|
 | 
						|
    def to_proto(self):
 | 
						|
        return debug_pb.DebugBox(
 | 
						|
            min=self._start_point.as_Point,
 | 
						|
            max=self._end_point.as_Point,
 | 
						|
            color=self.to_debug_color(self._color),
 | 
						|
        )
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash((self._start_point, self._end_point, self._color))
 | 
						|
 | 
						|
 | 
						|
class DrawItemSphere(DrawItem):
 | 
						|
 | 
						|
    def __init__(self, start_point: Point3 = None, radius: float = None, color: Point3 = None):
 | 
						|
        self._start_point: Point3 = start_point
 | 
						|
        self._radius: float = radius
 | 
						|
        self._color: Point3 = color
 | 
						|
 | 
						|
    def to_proto(self):
 | 
						|
        return debug_pb.DebugSphere(
 | 
						|
            p=self._start_point.as_Point, r=self._radius, color=self.to_debug_color(self._color)
 | 
						|
        )
 | 
						|
 | 
						|
    def __hash__(self):
 | 
						|
        return hash((self._start_point, self._radius, self._color))
 |