Files
Grinch-AP/worlds/sc2/client.py
Phaneros 92ff0ddba8 SC2: Launcher bugfixes after content merge (#5409)
* sc2: Fixing Launcher.py launch not properly handling command-line arguments

* sc2: Fixing some old option names in webhost

* sc2: Switching to common client url parameter handling
2025-09-30 18:34:26 +02:00

2349 lines
116 KiB
Python

from __future__ import annotations
import asyncio
import collections
import copy
import ctypes
import enum
import functools
import inspect
import logging
import multiprocessing
import os.path
import re
import sys
import tempfile
import typing
import queue
import zipfile
import io
import random
import concurrent.futures
import time
import uuid
import argparse
from pathlib import Path
# CommonClient import first to trigger ModuleUpdater
from CommonClient import CommonContext, server_loop, ClientCommandProcessor, gui_enabled, get_base_parser, handle_url_arg
from Utils import init_logging, is_windows, async_start
from .item import item_names, item_parents, race_to_item_type
from .item.item_annotations import ITEM_NAME_ANNOTATIONS
from .item.item_groups import item_name_groups, unlisted_item_name_groups, ItemGroupNames
from . import options, VICTORY_MODULO
from .options import (
MissionOrder, KerriganPrimalStatus, kerrigan_unit_available, KerriganPresence, EnableMorphling, GameDifficulty,
GameSpeed, GenericUpgradeItems, GenericUpgradeResearch, ColorChoice, GenericUpgradeMissions, MaxUpgradeLevel,
LocationInclusion, ExtraLocations, MasteryLocations, SpeedrunLocations, PreventativeLocations, ChallengeLocations,
VanillaLocations,
DisableForcedCamera, SkipCutscenes, GrantStoryTech, GrantStoryLevels, TakeOverAIAllies, RequiredTactics,
SpearOfAdunPresence, SpearOfAdunPresentInNoBuild, SpearOfAdunPassiveAbilityPresence,
SpearOfAdunPassivesPresentInNoBuild, EnableVoidTrade, VoidTradeAgeLimit, void_trade_age_limits_ms, VoidTradeWorkers,
DifficultyDamageModifier, MissionOrderScouting, GenericUpgradeResearchSpeedup, MercenaryHighlanders, WarCouncilNerfs,
is_mission_in_soa_presence,
)
from .mission_order.slot_data import CampaignSlotData, LayoutSlotData, MissionSlotData, MissionOrderObjectSlotData
from .mission_order.entry_rules import SubRuleRuleData, CountMissionsRuleData, MissionEntryRules
from .mission_tables import MissionFlag
from .transfer_data import normalized_unit_types, worker_units
from . import SC2World
if __name__ == "__main__":
init_logging("SC2Client", exception_logger="Client")
logger = logging.getLogger("Client")
sc2_logger = logging.getLogger("Starcraft2")
import nest_asyncio
from worlds._sc2common import bot
from worlds._sc2common.bot.data import Race
from worlds._sc2common.bot.main import run_game
from worlds._sc2common.bot.player import Bot
from .item.item_tables import (
lookup_id_to_name, get_full_item_list, ItemData,
ZergItemType, upgrade_bundles,
WEAPON_ARMOR_UPGRADE_MAX_LEVEL,
)
from .locations import SC2WOL_LOC_ID_OFFSET, LocationType, LocationFlag, SC2HOTS_LOC_ID_OFFSET, VICTORY_CACHE_OFFSET
from .mission_tables import (
lookup_id_to_mission, SC2Campaign, MissionInfo,
lookup_id_to_campaign, SC2Mission, campaign_mission_table, SC2Race
)
import colorama
from .options import Option, upgrade_included_names
from NetUtils import ClientStatus, NetworkItem, JSONtoTextParser, JSONMessagePart, add_json_item, add_json_location, add_json_text, JSONTypes
from MultiServer import mark_raw
pool = concurrent.futures.ThreadPoolExecutor(1)
loop = asyncio.get_event_loop_policy().new_event_loop()
nest_asyncio.apply(loop)
MAX_BONUS: int = 28
# GitHub repo where the Map/mod data is hosted for /download_data command
DATA_REPO_OWNER = "Ziktofel"
DATA_REPO_NAME = "Archipelago-SC2-data"
DATA_API_VERSION = "API4"
# Bot controller
CONTROLLER_HEALTH: int = 38281
CONTROLLER2_HEALTH: int = 38282
# Void Trade
TRADE_UNIT = "AP_TradeStructure" # ID of the unit
TRADE_SEND_BUTTON = "AP_TradeStructureDummySend" # ID of the button
TRADE_RECEIVE_1_BUTTON = "AP_TradeStructureDummyReceive" # ID of the button
TRADE_RECEIVE_5_BUTTON = "AP_TradeStructureDummyReceive5" # ID of the button
TRADE_DATASTORAGE_TEAM = "SC2_VoidTrade_" # + Team
TRADE_DATASTORAGE_SLOT = "slot_" # + Slot
TRADE_DATASTORAGE_LOCK = "_lock"
TRADE_LOCK_TIME = 5 # Time in seconds that the DataStorage may be considered safe to edit
TRADE_LOCK_WAIT_LIMIT = 540000 / 1.4 # Time in ms that the client may spend trying to get a lock (540000 = 9 minutes, 1.4 is 'faster' game speed's time scale)
# Games
STARCRAFT2 = "Starcraft 2"
STARCRAFT2_WOL = "Starcraft 2 Wings of Liberty"
# Data version file path.
# This file is used to tell if the downloaded data are outdated
# Associated with /download_data command
def get_metadata_file() -> str:
return os.environ["SC2PATH"] + os.sep + "ArchipelagoSC2Metadata.txt"
def _remap_color_option(slot_data_version: int, color: int) -> int:
"""Remap colour options for backwards compatibility with older slot data"""
if slot_data_version < 4 and color == ColorChoice.option_mengsk:
return ColorChoice.option_default
return color
class ConfigurableOptionType(enum.Enum):
INTEGER = enum.auto()
ENUM = enum.auto()
class ConfigurableOptionInfo(typing.NamedTuple):
name: str
variable_name: str
option_class: typing.Type[Option]
option_type: ConfigurableOptionType = ConfigurableOptionType.ENUM
can_break_logic: bool = False
class ColouredMessage:
def __init__(self, text: str = '', *, keep_markup: bool = False) -> None:
self.parts: typing.List[dict] = []
if text:
self(text, keep_markup=keep_markup)
def __call__(self, text: str, *, keep_markup: bool = False) -> 'ColouredMessage':
add_json_text(self.parts, text, keep_markup=keep_markup)
return self
def coloured(self, text: str, colour: str, *, keep_markup: bool = False) -> 'ColouredMessage':
add_json_text(self.parts, text, type="color", color=colour, keep_markup=keep_markup)
return self
def location(self, location_id: int, player_id: int) -> 'ColouredMessage':
add_json_location(self.parts, location_id, player_id)
return self
def item(self, item_id: int, player_id: int, flags: int = 0) -> 'ColouredMessage':
add_json_item(self.parts, item_id, player_id, flags)
return self
def player(self, player_id: int) -> 'ColouredMessage':
add_json_text(self.parts, str(player_id), type=JSONTypes.player_id)
return self
def send(self, ctx: SC2Context) -> None:
ctx.on_print_json({"data": self.parts, "cmd": "PrintJSON"})
class StarcraftClientProcessor(ClientCommandProcessor):
ctx: SC2Context
def formatted_print(self, text: str) -> None:
"""Prints with kivy formatting to the GUI, and also prints to command-line and to all logs"""
# Note(mm): Bold/underline can help readability, but unfortunately the CommonClient does not filter bold tags from command-line output.
# Regardless, using `on_print_json` to get formatted text in the GUI and output in the command-line and in the logs,
# without having to branch code from CommonClient
self.ctx.on_print_json({"data": [{"text": text, "keep_markup": True}]})
def _cmd_difficulty(self, difficulty: str = "") -> bool:
"""Overrides the current difficulty set for the world. Takes the argument casual, normal, hard, or brutal"""
arguments = difficulty.split()
num_arguments = len(arguments)
if num_arguments > 0:
difficulty_choice = arguments[0].lower()
if difficulty_choice == "casual":
self.ctx.difficulty_override = 0
elif difficulty_choice == "normal":
self.ctx.difficulty_override = 1
elif difficulty_choice == "hard":
self.ctx.difficulty_override = 2
elif difficulty_choice == "brutal":
self.ctx.difficulty_override = 3
else:
self.output("Unable to parse difficulty '" + arguments[0] + "'")
return False
self.output("Difficulty set to " + arguments[0])
return True
else:
if self.ctx.difficulty == -1:
self.output("Please connect to a seed before checking difficulty.")
else:
current_difficulty = self.ctx.difficulty
if self.ctx.difficulty_override >= 0:
current_difficulty = self.ctx.difficulty_override
self.output("Current difficulty: " + ["Casual", "Normal", "Hard", "Brutal"][current_difficulty])
self.output("To change the difficulty, add the name of the difficulty after the command.")
return False
def _cmd_game_speed(self, game_speed: str = "") -> bool:
"""Overrides the current game speed for the world.
Takes the arguments default, slower, slow, normal, fast, faster"""
arguments = game_speed.split()
num_arguments = len(arguments)
if num_arguments > 0:
speed_choice = arguments[0].lower()
if speed_choice == "default":
self.ctx.game_speed_override = 0
elif speed_choice == "slower":
self.ctx.game_speed_override = 1
elif speed_choice == "slow":
self.ctx.game_speed_override = 2
elif speed_choice == "normal":
self.ctx.game_speed_override = 3
elif speed_choice == "fast":
self.ctx.game_speed_override = 4
elif speed_choice == "faster":
self.ctx.game_speed_override = 5
else:
self.output("Unable to parse game speed '" + arguments[0] + "'")
return False
self.output("Game speed set to " + arguments[0])
return True
else:
if self.ctx.game_speed == -1:
self.output("Please connect to a seed before checking game speed.")
else:
current_speed = self.ctx.game_speed
if self.ctx.game_speed_override >= 0:
current_speed = self.ctx.game_speed_override
self.output("Current game speed: "
+ ["Default", "Slower", "Slow", "Normal", "Fast", "Faster"][current_speed])
self.output("To change the game speed, add the name of the speed after the command,"
" or Default to select based on difficulty.")
return False
@mark_raw
def _cmd_received(self, filter_search: str = "") -> bool:
"""List received items.
Pass in a parameter to filter the search by partial item name or exact item group.
Use '/received recent <number>' to list the last 'number' items received (default 20)."""
if self.ctx.slot is None:
self.formatted_print("Connect to a slot to view what items are received.")
return True
if filter_search.casefold().startswith('recent'):
return self._received_recent(filter_search[len('recent'):].strip())
# Groups must be matched case-sensitively, so we properly capitalize the search term
# eg. "Spear of Adun" over "Spear Of Adun" or "spear of adun"
# This fails a lot of item name matches, but those should be found by partial name match
group_filter = ''
for group_name in item_name_groups:
if group_name in unlisted_item_name_groups:
continue
if filter_search.casefold() == group_name.casefold():
group_filter = group_name
break
def item_matches_filter(item_name: str) -> bool:
# The filter can be an exact group name or a partial item name
# Partial item name can be matched case-insensitively
if filter_search.casefold() in item_name.casefold():
return True
# The search term should already be formatted as a group name
if group_filter and item_name in item_name_groups[group_filter]:
return True
return False
items = get_full_item_list()
categorized_items: typing.Dict[SC2Race, typing.List[typing.Union[int, str]]] = {}
parent_to_child: typing.Dict[typing.Union[int, str], typing.List[int]] = {}
items_received: typing.Dict[int, typing.List[NetworkItem]] = {}
for item in self.ctx.items_received:
items_received.setdefault(item.item, []).append(item)
items_received_set = set(items_received)
for item_data in items.values():
if item_data.parent:
parent_rule = item_parents.parent_present[item_data.parent]
if parent_rule.constraint_group is not None and parent_rule.constraint_group in items:
parent_to_child.setdefault(items[parent_rule.constraint_group].code, []).append(item_data.code)
continue
race = items[parent_rule.parent_items()[0]].race
categorized_items.setdefault(race, [])
if parent_rule.display_string not in categorized_items[race]:
categorized_items[race].append(parent_rule.display_string)
parent_to_child.setdefault(parent_rule.display_string, []).append(item_data.code)
else:
categorized_items.setdefault(item_data.race, []).append(item_data.code)
def display_info(element: typing.Union[SC2Race, str, int]) -> tuple:
"""Return (should display, name, type, children, sum(obtained), sum(matching filter))"""
have_item = isinstance(element, int) and element in items_received_set
if isinstance(element, SC2Race):
children: typing.Sequence[typing.Union[str, int]] = categorized_items[faction]
name = element.name
elif isinstance(element, int):
children = parent_to_child.get(element, [])
name = self.ctx.item_names.lookup_in_game(element)
else:
assert isinstance(element, str)
children = parent_to_child[element]
name = element
matches_filter = item_matches_filter(name)
child_states = [display_info(child) for child in children]
return (
(have_item and matches_filter) or any(child_state[0] for child_state in child_states),
name,
element,
child_states,
sum(child_state[4] for child_state in child_states) + have_item,
sum(child_state[5] for child_state in child_states) + (have_item and matches_filter),
)
def display_tree(
should_display: bool, name: str, element: typing.Union[SC2Race, str, int], child_states: tuple, indent: int = 0
) -> None:
if not should_display:
return
assert self.ctx.slot is not None
indent_str = " " * indent
if isinstance(element, SC2Race):
self.formatted_print(f" [u]{name}[/u] ")
for child in child_states:
display_tree(*child[:4])
elif isinstance(element, str):
ColouredMessage(indent_str)("- ").coloured(name, "white").send(self.ctx)
for child in child_states:
display_tree(*child[:4], indent=indent+2)
elif isinstance(element, int):
items = items_received.get(element, [])
if not items:
ColouredMessage(indent_str)("- ").coloured(name, "red")(" - not obtained").send(self.ctx)
for item in items:
(ColouredMessage(indent_str)('- ')
.item(item.item, self.ctx.slot, flags=item.flags)
(" from ").location(item.location, item.player)
(" by ").player(item.player)
).send(self.ctx)
for child in child_states:
display_tree(*child[:4], indent=indent+2)
non_matching_descendents = sum(child[5] - child[4] for child in children)
if non_matching_descendents > 0:
self.formatted_print(f"{indent_str} + {non_matching_descendents} child items that don't match the filter")
item_types_obtained = 0
items_obtained_matching_filter = 0
for faction in SC2Race:
should_display, name, element, children, faction_items_obtained, faction_items_matching_filter = display_info(faction)
item_types_obtained += faction_items_obtained
items_obtained_matching_filter += faction_items_matching_filter
display_tree(should_display, name, element, children)
if filter_search == "":
self.formatted_print(f"[b]Obtained: {len(self.ctx.items_received)} items ({item_types_obtained} types)[/b]")
else:
self.formatted_print(f"[b]Filter \"{filter_search}\" found {items_obtained_matching_filter} out of {item_types_obtained} obtained item types[/b]")
return True
def _received_recent(self, amount: str) -> bool:
assert self.ctx.slot is not None
try:
display_amount = int(amount)
except ValueError:
display_amount = 20
display_amount = min(display_amount, len(self.ctx.items_received))
self.formatted_print(f"Last {display_amount} of {len(self.ctx.items_received)} items received (most recent last):")
for item in self.ctx.items_received[-display_amount:]:
(
ColouredMessage()
.item(item.item, self.ctx.slot, item.flags)
(" from ").location(item.location, item.player)
(" by ").player(item.player)
).send(self.ctx)
return True
def _cmd_option(self, option_name: str = "", option_value: str = "") -> None:
"""Sets a Starcraft game option that can be changed after generation. Use "/option list" to see all options."""
LOGIC_WARNING = " *Note changing this may result in logically unbeatable games*\n"
configurable_options = (
ConfigurableOptionInfo('speed', 'game_speed', options.GameSpeed),
ConfigurableOptionInfo('kerrigan_presence', 'kerrigan_presence', options.KerriganPresence, can_break_logic=True),
ConfigurableOptionInfo('kerrigan_level_cap', 'kerrigan_total_level_cap', options.KerriganTotalLevelCap, ConfigurableOptionType.INTEGER, can_break_logic=True),
ConfigurableOptionInfo('kerrigan_mission_level_cap', 'kerrigan_levels_per_mission_completed_cap', options.KerriganLevelsPerMissionCompletedCap, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('kerrigan_levels_per_mission', 'kerrigan_levels_per_mission_completed', options.KerriganLevelsPerMissionCompleted, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('grant_story_levels', 'grant_story_levels', options.GrantStoryLevels, can_break_logic=True),
ConfigurableOptionInfo('grant_story_tech', 'grant_story_tech', options.GrantStoryTech, can_break_logic=True),
ConfigurableOptionInfo('control_ally', 'take_over_ai_allies', options.TakeOverAIAllies, can_break_logic=True),
ConfigurableOptionInfo('soa_presence', 'spear_of_adun_presence', options.SpearOfAdunPresence, can_break_logic=True),
ConfigurableOptionInfo('soa_in_nobuilds', 'spear_of_adun_present_in_no_build', options.SpearOfAdunPresentInNoBuild, can_break_logic=True),
# Note(mm): Technically SOA passive presence is in the logic for Amon's Fall if Takeover AI Allies is true,
# but that's edge case enough I don't think we should warn about it.
ConfigurableOptionInfo('soa_passive_presence', 'spear_of_adun_passive_ability_presence', options.SpearOfAdunPassiveAbilityPresence),
ConfigurableOptionInfo('soa_passives_in_nobuilds', 'spear_of_adun_passive_present_in_no_build', options.SpearOfAdunPassivesPresentInNoBuild),
ConfigurableOptionInfo('max_upgrade_level', 'max_upgrade_level', options.MaxUpgradeLevel, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('generic_upgrade_research', 'generic_upgrade_research', options.GenericUpgradeResearch),
ConfigurableOptionInfo('generic_upgrade_research_speedup', 'generic_upgrade_research_speedup', options.GenericUpgradeResearchSpeedup),
ConfigurableOptionInfo('minerals_per_item', 'minerals_per_item', options.MineralsPerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('gas_per_item', 'vespene_per_item', options.VespenePerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('supply_per_item', 'starting_supply_per_item', options.StartingSupplyPerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('max_supply_per_item', 'maximum_supply_per_item', options.MaximumSupplyPerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('reduced_supply_per_item', 'maximum_supply_reduction_per_item', options.MaximumSupplyReductionPerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('lowest_max_supply', 'lowest_maximum_supply', options.LowestMaximumSupply, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('research_cost_per_item', 'research_cost_reduction_per_item', options.ResearchCostReductionPerItem, ConfigurableOptionType.INTEGER),
ConfigurableOptionInfo('no_forced_camera', 'disable_forced_camera', options.DisableForcedCamera),
ConfigurableOptionInfo('skip_cutscenes', 'skip_cutscenes', options.SkipCutscenes),
ConfigurableOptionInfo('enable_morphling', 'enable_morphling', options.EnableMorphling, can_break_logic=True),
ConfigurableOptionInfo('difficulty_damage_modifier', 'difficulty_damage_modifier', options.DifficultyDamageModifier),
ConfigurableOptionInfo('void_trade_age_limit', 'trade_age_limit', options.VoidTradeAgeLimit),
ConfigurableOptionInfo('void_trade_workers', 'trade_workers_allowed', options.VoidTradeWorkers),
ConfigurableOptionInfo('mercenary_highlanders', 'mercenary_highlanders', options.MercenaryHighlanders),
)
WARNING_COLOUR = "salmon"
CMD_COLOUR = "slateblue"
boolean_option_map = {
'y': 'true', 'yes': 'true', 'n': 'false', 'no': 'false', 'true': 'true', 'false': 'false',
}
help_message = ColouredMessage(inspect.cleandoc("""
Options
--------------------
"""))('\n')
for option in configurable_options:
option_help_text = inspect.cleandoc(option.option_class.__doc__ or "No description provided.").split('\n', 1)[0]
help_message.coloured(option.name, CMD_COLOUR)(": " + " | ".join(option.option_class.options)
+ f" -- {option_help_text}\n")
if option.can_break_logic:
help_message.coloured(LOGIC_WARNING, WARNING_COLOUR)
help_message("--------------------\nEnter an option without arguments to see its current value.\n")
if not option_name or option_name == 'list' or option_name == 'help':
help_message.send(self.ctx)
return
for option in configurable_options:
if option_name == option.name:
option_value = boolean_option_map.get(option_value.lower(), option_value)
if not option_value:
pass
elif option.option_type == ConfigurableOptionType.ENUM and option_value in option.option_class.options:
self.ctx.__dict__[option.variable_name] = option.option_class.options[option_value]
elif option.option_type == ConfigurableOptionType.INTEGER:
try:
self.ctx.__dict__[option.variable_name] = int(option_value, base=0)
except:
self.output(f"{option_value} is not a valid integer")
else:
self.output(f"Unknown option value '{option_value}'")
ColouredMessage(f"{option.name} is '{option.option_class.get_option_name(self.ctx.__dict__[option.variable_name])}'").send(self.ctx)
break
else:
self.output(f"Unknown option '{option_name}'")
help_message.send(self.ctx)
def _cmd_color(self, faction: str = "", color: str = "") -> None:
"""Changes the player color for a given faction."""
player_colors = [
"White", "Red", "Blue", "Teal",
"Purple", "Yellow", "Orange", "Green",
"LightPink", "Violet", "LightGrey", "DarkGreen",
"Brown", "LightGreen", "DarkGrey", "Pink",
"Rainbow", "Mengsk", "BrightLime", "Arcane", "Ember", "HotPink",
"Random", "Default"
]
var_names = {
'raynor': 'player_color_raynor',
'kerrigan': 'player_color_zerg',
'primal': 'player_color_zerg_primal',
'protoss': 'player_color_protoss',
'nova': 'player_color_nova',
}
faction = faction.lower()
if not faction:
for faction_name, key in var_names.items():
self.output(f"Current player color for {faction_name}: {player_colors[self.ctx.__dict__[key]]}")
self.output("To change your color, add the faction name and color after the command.")
self.output("Available factions: " + ', '.join(var_names))
self.output("Available colors: " + ', '.join(player_colors))
return
elif faction not in var_names:
self.output(f"Unknown faction '{faction}'.")
self.output("Available factions: " + ', '.join(var_names))
return
match_colors = [player_color.lower() for player_color in player_colors]
if not color:
self.output(f"Current player color for {faction}: {player_colors[self.ctx.__dict__[var_names[faction]]]}")
self.output("To change this faction's colors, add the name of the color after the command.")
self.output("Available colors: " + ', '.join(player_colors))
else:
if color.lower() not in match_colors:
self.output(color + " is not a valid color. Available colors: " + ', '.join(player_colors))
return
if color.lower() == "random":
color = random.choice(player_colors[:-2])
self.ctx.__dict__[var_names[faction]] = match_colors.index(color.lower())
self.ctx.pending_color_update = True
self.output(f"Color for {faction} set to " + player_colors[self.ctx.__dict__[var_names[faction]]])
def _cmd_windowed_mode(self, value="") -> None:
"""Controls whether sc2 will launch in Windowed mode. Persists across sessions."""
if not value:
sc2_logger.info("Use `/windowed_mode [true|false]` to set the windowed mode")
elif value.casefold() in ('t', 'true', 'yes', 'y'):
SC2World.settings.game_windowed_mode = True
force_settings_save_on_close()
else:
SC2World.settings.game_windowed_mode = False
force_settings_save_on_close()
sc2_logger.info(f"Windowed mode is: {SC2World.settings.game_windowed_mode}")
def _cmd_disable_mission_check(self) -> bool:
"""Disables the check to see if a mission is available to play. Meant for co-op runs where one player can play
the next mission in a chain the other player is doing."""
self.ctx.missions_unlocked = True
sc2_logger.info("Mission check has been disabled")
return True
@mark_raw
def _cmd_set_path(self, path: str = '') -> bool:
"""Manually set the SC2 install directory (if the automatic detection fails)."""
if path:
os.environ["SC2PATH"] = path
is_mod_installed_correctly()
return True
else:
sc2_logger.warning("When using set_path, you must type the path to your SC2 install directory.")
return False
def _cmd_download_data(self) -> bool:
"""Download the most recent release of the necessary files for playing SC2 with
Archipelago. Will overwrite existing files."""
pool.submit(self._download_data, self.ctx)
return True
@staticmethod
def _download_data(ctx: SC2Context) -> bool:
if "SC2PATH" not in os.environ:
check_game_install_path()
if os.path.exists(get_metadata_file()):
with open(get_metadata_file(), "r") as f:
metadata = f.read()
else:
metadata = None
tempzip, metadata = download_latest_release_zip(
DATA_REPO_OWNER, DATA_REPO_NAME, DATA_API_VERSION, metadata=metadata, force_download=True)
if tempzip:
try:
zipfile.ZipFile(tempzip).extractall(path=os.environ["SC2PATH"])
sc2_logger.info("Download complete. Package installed.")
if metadata is not None:
with open(get_metadata_file(), "w") as f:
f.write(metadata)
finally:
os.remove(tempzip)
else:
sc2_logger.warning("Download aborted/failed. Read the log for more information.")
return False
ctx.data_out_of_date = False
return True
class SC2JSONtoTextParser(JSONtoTextParser):
def __init__(self, ctx: SC2Context) -> None:
self.handlers = {
"ItemSend": self._handle_color,
"ItemCheat": self._handle_color,
"Hint": self._handle_color,
}
super().__init__(ctx)
def _handle_color(self, node: JSONMessagePart) -> str:
codes = node["color"].split(";")
buffer = "".join(self.color_code(code) for code in codes if code in self.color_codes)
return buffer + self._handle_text(node) + '</c>'
def _handle_item_name(self, node: JSONMessagePart) -> str:
if self.ctx.slot_info[node["player"]].game == STARCRAFT2:
annotation = ITEM_NAME_ANNOTATIONS.get(node["text"])
if annotation is not None:
node["text"] += f" {annotation}"
return super()._handle_item_name(node)
def color_code(self, code: str) -> str:
return '<c val="' + self.color_codes[code] + '">'
class SC2Context(CommonContext):
command_processor = StarcraftClientProcessor
game = STARCRAFT2
items_handling = 0b111
def __init__(self, *args, **kwargs) -> None:
super(SC2Context, self).__init__(*args, **kwargs)
self.raw_text_parser = SC2JSONtoTextParser(self)
self.data_out_of_date: bool = False
self.difficulty = -1
self.game_speed = -1
self.disable_forced_camera = 0
self.skip_cutscenes = 0
self.all_in_choice = 0
self.mission_order = 0
self.player_color_raynor = ColorChoice.option_blue
self.player_color_zerg = ColorChoice.option_orange
self.player_color_zerg_primal = ColorChoice.option_purple
self.player_color_protoss = ColorChoice.option_blue
self.player_color_nova = ColorChoice.option_dark_grey
self.pending_color_update = False
self.kerrigan_presence: int = KerriganPresence.default
self.kerrigan_primal_status = 0
self.enable_morphling = EnableMorphling.default
self.custom_mission_order: typing.List[CampaignSlotData] = []
self.mission_id_to_entry_rules: typing.Dict[int, MissionEntryRules]
self.final_mission_ids: typing.List[int] = [29]
self.final_locations: typing.List[int] = []
self.announcements: queue.Queue = queue.Queue()
self.sc2_run_task: typing.Optional[asyncio.Task] = None
self.missions_unlocked: bool = False # allow launching missions ignoring requirements
self.max_upgrade_level: int = MaxUpgradeLevel.default
self.generic_upgrade_missions = 0
self.generic_upgrade_research = 0
self.generic_upgrade_research_speedup: int = GenericUpgradeResearchSpeedup.default
self.generic_upgrade_items = 0
self.location_inclusions: typing.Dict[LocationType, int] = {}
self.location_inclusions_by_flag: typing.Dict[LocationFlag, int] = {}
self.plando_locations: typing.List[str] = []
self.difficulty_override = -1
self.game_speed_override = -1
self.mission_id_to_location_ids: typing.Dict[int, typing.List[int]] = {}
self.last_bot: typing.Optional[ArchipelagoBot] = None
self.slot_data_version = 2
self.required_tactics: int = RequiredTactics.default
self.grant_story_tech: int = GrantStoryTech.default
self.grant_story_levels: int = GrantStoryLevels.default
self.take_over_ai_allies: int = TakeOverAIAllies.default
self.spear_of_adun_presence = SpearOfAdunPresence.option_not_present
self.spear_of_adun_present_in_no_build = SpearOfAdunPresentInNoBuild.option_false
self.spear_of_adun_passive_ability_presence = SpearOfAdunPassiveAbilityPresence.option_not_present
self.spear_of_adun_passive_present_in_no_build = SpearOfAdunPassivesPresentInNoBuild.option_false
self.minerals_per_item: int = 15 # For backwards compat with games generated pre-0.4.5
self.vespene_per_item: int = 15 # For backwards compat with games generated pre-0.4.5
self.starting_supply_per_item: int = 2 # For backwards compat with games generated pre-0.4.5
self.maximum_supply_per_item: int = 2
self.maximum_supply_reduction_per_item: int = options.MaximumSupplyReductionPerItem.default
self.lowest_maximum_supply: int = options.LowestMaximumSupply.default
self.research_cost_reduction_per_item: int = options.ResearchCostReductionPerItem.default
self.use_nova_wol_fallback: bool = False
self.use_nova_nco_fallback: bool = False
self.mercenary_highlanders: bool = False
self.kerrigan_levels_per_mission_completed = 0
self.trade_enabled: int = EnableVoidTrade.default
self.trade_age_limit: int = VoidTradeAgeLimit.default
self.trade_workers_allowed: int = VoidTradeWorkers.default
self.trade_underway: bool = False
self.trade_latest_reply: typing.Optional[dict] = None
self.trade_reply_event = asyncio.Event()
self.trade_lock_wait: int = 0
self.trade_lock_start: typing.Optional[float] = None
self.trade_response: typing.Optional[str] = None
self.difficulty_damage_modifier: int = DifficultyDamageModifier.default
self.mission_order_scouting = MissionOrderScouting.option_none
self.mission_item_classification: typing.Optional[typing.Dict[str, int]] = None
self.war_council_nerfs: bool = False
async def server_auth(self, password_requested: bool = False) -> None:
self.game = STARCRAFT2
if password_requested and not self.password:
await super(SC2Context, self).server_auth(password_requested)
await self.get_username()
await self.send_connect()
if self.ui:
self.ui.first_check = True
def is_legacy_game(self):
return self.game == STARCRAFT2_WOL
def event_invalid_game(self):
if self.is_legacy_game():
self.game = STARCRAFT2
super().event_invalid_game()
else:
self.game = STARCRAFT2_WOL
async_start(self.send_connect())
def trade_storage_team(self) -> str:
return f"{TRADE_DATASTORAGE_TEAM}{self.team}"
def trade_storage_slot(self) -> str:
return f"{TRADE_DATASTORAGE_SLOT}{self.slot}"
def _apply_host_settings_to_options(self) -> None:
if str(SC2World.settings.game_difficulty).casefold() == 'casual':
self.difficulty = GameDifficulty.option_casual
elif str(SC2World.settings.game_difficulty).casefold() == 'normal':
self.difficulty = GameDifficulty.option_normal
elif str(SC2World.settings.game_difficulty).casefold() == 'hard':
self.difficulty = GameDifficulty.option_hard
elif str(SC2World.settings.game_difficulty).casefold() == 'brutal':
self.difficulty = GameDifficulty.option_brutal
if str(SC2World.settings.game_speed).casefold() == 'slower':
self.game_speed = GameSpeed.option_slower
elif str(SC2World.settings.game_speed).casefold() == 'slow':
self.game_speed = GameSpeed.option_slow
elif str(SC2World.settings.game_speed).casefold() == 'normal':
self.game_speed = GameSpeed.option_normal
elif str(SC2World.settings.game_speed).casefold() == 'fast':
self.game_speed = GameSpeed.option_fast
elif str(SC2World.settings.game_speed).casefold() == 'faster':
self.game_speed = GameSpeed.option_faster
if str(SC2World.settings.disable_forced_camera).casefold() == 'true':
self.disable_forced_camera = DisableForcedCamera.option_true
elif str(SC2World.settings.disable_forced_camera).casefold() == 'false':
self.disable_forced_camera = DisableForcedCamera.option_false
if str(SC2World.settings.skip_cutscenes).casefold() == 'true':
self.skip_cutscenes = SkipCutscenes.option_true
elif str(SC2World.settings.skip_cutscenes).casefold() == 'false':
self.skip_cutscenes = SkipCutscenes.option_false
def on_package(self, cmd: str, args: dict) -> None:
if cmd == "Connected":
# Set up the trade storage
async_start(self.send_msgs([
{ # We want to know about other clients' Set commands for locking
"cmd": "SetNotify",
"keys": [self.trade_storage_team()],
},
{
"cmd": "Set",
"key": self.trade_storage_team(),
"default": { TRADE_DATASTORAGE_LOCK: 0 },
"operations": [{"operation": "default", "value": None}] # value is ignored
}
]))
self.difficulty = args["slot_data"]["game_difficulty"]
self.game_speed = args["slot_data"].get("game_speed", GameSpeed.option_default)
self.disable_forced_camera = args["slot_data"].get("disable_forced_camera", DisableForcedCamera.default)
self.skip_cutscenes = args["slot_data"].get("skip_cutscenes", SkipCutscenes.default)
self.all_in_choice = args["slot_data"]["all_in_map"]
self.slot_data_version = args["slot_data"].get("version", 2)
self._apply_host_settings_to_options()
if self.slot_data_version < 4:
# Maintaining backwards compatibility with older slot data
slot_req_table: dict = args["slot_data"]["mission_req"]
first_item = list(slot_req_table.keys())[0]
if first_item in [str(campaign.id) for campaign in SC2Campaign]:
# Multi-campaign
mission_req_table = {}
for campaign_id in slot_req_table:
campaign = lookup_id_to_campaign[int(campaign_id)]
mission_req_table[campaign] = {
mission: self.parse_mission_info(mission_info)
for mission, mission_info in slot_req_table[campaign_id].items()
}
else:
# Old format
mission_req_table = {SC2Campaign.GLOBAL: {
mission: self.parse_mission_info(mission_info)
for mission, mission_info in slot_req_table.items()
}
}
self.custom_mission_order = self.parse_mission_req_table(mission_req_table)
if self.slot_data_version >= 4:
self.custom_mission_order = [
CampaignSlotData(
**{field:value for field, value in campaign_data.items() if field not in ["layouts", "entry_rule"]},
entry_rule = SubRuleRuleData.parse_from_dict(campaign_data["entry_rule"]),
layouts = [
LayoutSlotData(
**{field:value for field, value in layout_data.items() if field not in ["missions", "entry_rule"]},
entry_rule = SubRuleRuleData.parse_from_dict(layout_data["entry_rule"]),
missions = [
[
MissionSlotData(
**{field:value for field, value in mission_data.items() if field != "entry_rule"},
entry_rule = SubRuleRuleData.parse_from_dict(mission_data["entry_rule"])
) for mission_data in column
] for column in layout_data["missions"]
]
) for layout_data in campaign_data["layouts"]
]
) for campaign_data in args["slot_data"]["custom_mission_order"]
]
self.mission_id_to_entry_rules = {
mission.mission_id: MissionEntryRules(mission.entry_rule, layout.entry_rule, campaign.entry_rule)
for campaign in self.custom_mission_order for layout in campaign.layouts
for column in layout.missions for mission in column
}
self.mission_order = args["slot_data"].get("mission_order", MissionOrder.option_vanilla)
if self.slot_data_version < 4:
self.final_mission_ids = [args["slot_data"].get("final_mission", SC2Mission.ALL_IN.id)]
else:
self.final_mission_ids = args["slot_data"].get("final_mission_ids", [SC2Mission.ALL_IN.id])
self.final_locations = [get_location_id(mission_id, 0) for mission_id in self.final_mission_ids]
self.player_color_raynor = _remap_color_option(
self.slot_data_version,
args["slot_data"].get("player_color_terran_raynor", ColorChoice.option_blue)
)
self.player_color_zerg = _remap_color_option(
self.slot_data_version,
args["slot_data"].get("player_color_zerg", ColorChoice.option_orange)
)
self.player_color_zerg_primal = _remap_color_option(
self.slot_data_version,
args["slot_data"].get("player_color_zerg_primal", ColorChoice.option_purple)
)
self.player_color_protoss = _remap_color_option(
self.slot_data_version,
args["slot_data"].get("player_color_protoss", ColorChoice.option_blue)
)
self.player_color_nova = _remap_color_option(
self.slot_data_version,
args["slot_data"].get("player_color_nova", ColorChoice.option_dark_grey)
)
self.war_council_nerfs = args["slot_data"].get("war_council_nerfs", WarCouncilNerfs.option_false)
self.mercenary_highlanders = args["slot_data"].get("mercenary_highlanders", MercenaryHighlanders.option_false)
self.generic_upgrade_missions = args["slot_data"].get("generic_upgrade_missions", GenericUpgradeMissions.default)
self.max_upgrade_level = args["slot_data"].get("max_upgrade_level", MaxUpgradeLevel.default)
self.generic_upgrade_items = args["slot_data"].get("generic_upgrade_items", GenericUpgradeItems.option_individual_items)
self.generic_upgrade_research = args["slot_data"].get("generic_upgrade_research", GenericUpgradeResearch.option_vanilla)
self.generic_upgrade_research_speedup = args["slot_data"].get("generic_upgrade_research_speedup", GenericUpgradeResearchSpeedup.default)
self.kerrigan_presence = args["slot_data"].get("kerrigan_presence", KerriganPresence.option_vanilla)
self.kerrigan_primal_status = args["slot_data"].get("kerrigan_primal_status", KerriganPrimalStatus.option_vanilla)
self.kerrigan_levels_per_mission_completed = args["slot_data"].get("kerrigan_levels_per_mission_completed", 0)
self.kerrigan_levels_per_mission_completed_cap = args["slot_data"].get("kerrigan_levels_per_mission_completed_cap", -1)
self.kerrigan_total_level_cap = args["slot_data"].get("kerrigan_total_level_cap", -1)
self.enable_morphling = args["slot_data"].get("enable_morphling", EnableMorphling.option_false)
self.grant_story_tech = args["slot_data"].get("grant_story_tech", GrantStoryTech.option_no_grant)
self.grant_story_levels = args["slot_data"].get("grant_story_levels", GrantStoryLevels.option_additive)
self.required_tactics = args["slot_data"].get("required_tactics", RequiredTactics.option_standard)
self.take_over_ai_allies = args["slot_data"].get("take_over_ai_allies", TakeOverAIAllies.option_false)
self.spear_of_adun_presence = args["slot_data"].get("spear_of_adun_presence", SpearOfAdunPresence.option_not_present)
self.spear_of_adun_present_in_no_build = args["slot_data"].get("spear_of_adun_present_in_no_build", SpearOfAdunPresentInNoBuild.option_false)
if self.slot_data_version < 4:
self.spear_of_adun_passive_ability_presence = args["slot_data"].get("spear_of_adun_autonomously_cast_ability_presence", SpearOfAdunPassiveAbilityPresence.option_not_present)
self.spear_of_adun_passive_present_in_no_build = args["slot_data"].get("spear_of_adun_autonomously_cast_present_in_no_build", SpearOfAdunPassivesPresentInNoBuild.option_false)
else:
self.spear_of_adun_passive_ability_presence = args["slot_data"].get("spear_of_adun_passive_ability_presence", SpearOfAdunPassiveAbilityPresence.option_not_present)
self.spear_of_adun_passive_present_in_no_build = args["slot_data"].get("spear_of_adun_passive_present_in_no_build", SpearOfAdunPassivesPresentInNoBuild.option_false)
self.minerals_per_item = args["slot_data"].get("minerals_per_item", 15)
self.vespene_per_item = args["slot_data"].get("vespene_per_item", 15)
self.starting_supply_per_item = args["slot_data"].get("starting_supply_per_item", 2)
self.maximum_supply_per_item = args["slot_data"].get("maximum_supply_per_item", options.MaximumSupplyPerItem.default)
self.maximum_supply_reduction_per_item = args["slot_data"].get("maximum_supply_reduction_per_item", options.MaximumSupplyReductionPerItem.default)
self.lowest_maximum_supply = args["slot_data"].get("lowest_maximum_supply", options.LowestMaximumSupply.default)
self.research_cost_reduction_per_item = args["slot_data"].get("research_cost_reduction_per_item", options.ResearchCostReductionPerItem.default)
self.use_nova_wol_fallback = args["slot_data"].get("use_nova_wol_fallback", True)
if self.slot_data_version < 4:
self.use_nova_nco_fallback = args["slot_data"].get("nova_covert_ops_only", False) and self.mission_order == MissionOrder.option_vanilla
else:
self.use_nova_nco_fallback = args["slot_data"].get("use_nova_nco_fallback", False)
self.trade_enabled = args["slot_data"].get("enable_void_trade", EnableVoidTrade.option_false)
self.trade_age_limit = args["slot_data"].get("void_trade_age_limit", VoidTradeAgeLimit.default)
self.trade_workers_allowed = args["slot_data"].get("void_trade_workers", VoidTradeWorkers.default)
self.difficulty_damage_modifier = args["slot_data"].get("difficulty_damage_modifier", DifficultyDamageModifier.option_true)
self.mission_order_scouting = args["slot_data"].get("mission_order_scouting", MissionOrderScouting.option_none)
self.mission_item_classification = args["slot_data"].get("mission_item_classification")
if self.required_tactics == RequiredTactics.option_no_logic:
# Locking Grant Story Tech/Levels if no logic
self.grant_story_tech = GrantStoryTech.option_grant
self.grant_story_levels = GrantStoryLevels.option_minimum
self.location_inclusions = {
LocationType.VICTORY: LocationInclusion.option_enabled, # Victory checks are always enabled
LocationType.VICTORY_CACHE: LocationInclusion.option_enabled, # Victory checks are always enabled
LocationType.VANILLA: args["slot_data"].get("vanilla_locations", VanillaLocations.default),
LocationType.EXTRA: args["slot_data"].get("extra_locations", ExtraLocations.default),
LocationType.CHALLENGE: args["slot_data"].get("challenge_locations", ChallengeLocations.default),
LocationType.MASTERY: args["slot_data"].get("mastery_locations", MasteryLocations.default),
}
self.location_inclusions_by_flag = {
LocationFlag.SPEEDRUN: args["slot_data"].get("speedrun_locations", SpeedrunLocations.default),
LocationFlag.PREVENTATIVE: args["slot_data"].get("preventative_locations", PreventativeLocations.default),
}
self.plando_locations = args["slot_data"].get("plando_locations", [])
self.build_location_to_mission_mapping()
# Looks for the required maps and mods for SC2. Runs check_game_install_path.
maps_present = is_mod_installed_correctly()
if os.path.exists(get_metadata_file()):
with open(get_metadata_file(), "r") as f:
current_ver = f.read()
sc2_logger.debug(f"Current version: {current_ver}")
if is_mod_update_available(DATA_REPO_OWNER, DATA_REPO_NAME, DATA_API_VERSION, current_ver):
(
ColouredMessage().coloured("NOTICE: Update for required files found. ", colour="red")
("Run ").coloured("/download_data", colour="slateblue")
(" to install.")
).send(self)
self.data_out_of_date = True
elif maps_present:
(
ColouredMessage()
.coloured("NOTICE: Your map files may be outdated (version number not found). ", colour="red")
("Run ").coloured("/download_data", colour="slateblue")
(" to install.")
).send(self)
self.data_out_of_date = True
ColouredMessage("[b]Check the Launcher tab to start playing.[/b]", keep_markup=True).send(self)
elif cmd == "SetReply":
# Currently can only be Void Trade reply
self.trade_latest_reply = args
self.trade_reply_event.set()
@staticmethod
def parse_mission_info(mission_info: dict[str, typing.Any]) -> MissionInfo:
if mission_info.get("id") is not None:
mission_info["mission"] = lookup_id_to_mission[mission_info["id"]]
elif isinstance(mission_info["mission"], int):
mission_info["mission"] = lookup_id_to_mission[mission_info["mission"]]
return MissionInfo(
**{field: value for field, value in mission_info.items() if field in MissionInfo._fields}
)
@staticmethod
def parse_mission_req_table(mission_req_table: typing.Dict[SC2Campaign, typing.Dict[typing.Any, MissionInfo]]) -> typing.List[CampaignSlotData]:
campaigns: typing.List[typing.Tuple[int, CampaignSlotData]] = []
rolling_rule_id = 0
for (campaign, campaign_data) in mission_req_table.items():
if campaign.campaign_name == "Global":
campaign_name = ""
else:
campaign_name = campaign.campaign_name
categories: typing.Dict[str, typing.List[MissionSlotData]] = {}
for mission in campaign_data.values():
if mission.category not in categories:
categories[mission.category] = []
mission_id = mission.mission.id
sub_rules: typing.List[CountMissionsRuleData] = []
missions: typing.List[int]
if mission.number:
amount = mission.number
missions = [
mission.mission.id
for mission in mission_req_table[campaign].values()
]
sub_rules.append(CountMissionsRuleData(missions, amount, [campaign_name]))
prev_missions: typing.List[int] = []
if len(mission.required_world) > 0:
missions = []
for connection in mission.required_world:
if isinstance(connection, dict):
required_campaign = {}
for camp, camp_data in mission_req_table.items():
if camp.id == connection["campaign"]:
required_campaign = camp_data
break
required_mission_id = connection["connect_to"]
else:
required_campaign = mission_req_table[connection.campaign]
required_mission_id = connection.connect_to
required_mission = list(required_campaign.values())[required_mission_id - 1]
missions.append(required_mission.mission.id)
if required_mission.category == mission.category:
prev_missions.append(required_mission.mission.id)
if mission.or_requirements:
amount = 1
else:
amount = len(missions)
sub_rules.append(CountMissionsRuleData(missions, amount, missions))
entry_rule = SubRuleRuleData(rolling_rule_id, sub_rules, len(sub_rules))
rolling_rule_id += 1
categories[mission.category].append(MissionSlotData.legacy(mission_id, prev_missions, entry_rule))
layouts: typing.List[LayoutSlotData] = []
for (layout, mission_slots) in categories.items():
if layout.startswith("_"):
layout_name = ""
else:
layout_name = layout
layouts.append(LayoutSlotData.legacy(layout_name, [mission_slots]))
campaigns.append((campaign.id, CampaignSlotData.legacy(campaign_name, layouts)))
return [data for (_, data) in sorted(campaigns)]
def on_print_json(self, args: dict) -> None:
# goes to this world
if "receiving" in args and self.slot_concerns_self(args["receiving"]):
relevant = True
# found in this world
elif "item" in args and self.slot_concerns_self(args["item"].player):
relevant = True
# not related
else:
relevant = False
if relevant:
self.announcements.put(self.raw_text_parser(copy.deepcopy(args["data"])))
super(SC2Context, self).on_print_json(args)
def run_gui(self) -> None:
from .client_gui import start_gui
start_gui(self)
async def shutdown(self) -> None:
await super(SC2Context, self).shutdown()
if self.last_bot:
self.last_bot.want_close = True
# If the client is not set up yet, the game is not done loading and must be force-closed
if not hasattr(self.last_bot, "client"):
bot.sc2process.kill_switch.kill_all()
if self.sc2_run_task:
self.sc2_run_task.cancel()
async def disconnect(self, allow_autoreconnect: bool = False):
self.finished_game = False
await super(SC2Context, self).disconnect(allow_autoreconnect=allow_autoreconnect)
def play_mission(self, mission_id: int) -> bool:
if self.missions_unlocked or is_mission_available(self, mission_id):
if self.sc2_run_task:
if not self.sc2_run_task.done():
sc2_logger.warning("Starcraft 2 Client is still running!")
self.sc2_run_task.cancel() # doesn't actually close the game, just stops the python task
if self.slot is None:
sc2_logger.warning("Launching Mission without Archipelago authentication, "
"checks will not be registered to server.")
self.sc2_run_task = asyncio.create_task(starcraft_launch(self, mission_id),
name="Starcraft 2 Launch")
return True
else:
sc2_logger.info(f"{lookup_id_to_mission[mission_id].mission_name} is not currently unlocked.")
return False
def build_location_to_mission_mapping(self) -> None:
mission_id_to_location_ids: typing.Dict[int, typing.Set[int]] = {
mission.mission_id: set()
for campaign in self.custom_mission_order for layout in campaign.layouts
for column in layout.missions for mission in column
}
for loc in self.server_locations:
offset = (
SC2WOL_LOC_ID_OFFSET
if loc < SC2HOTS_LOC_ID_OFFSET
else (SC2HOTS_LOC_ID_OFFSET - SC2Mission.ALL_IN.id * VICTORY_MODULO)
)
mission_id, objective = divmod(loc - offset, VICTORY_MODULO)
mission_id_to_location_ids[mission_id].add(objective)
self.mission_id_to_location_ids = {
mission_id: sorted(objectives)
for mission_id, objectives in mission_id_to_location_ids.items()
}
def locations_for_mission(self, mission: SC2Mission) -> typing.Iterable[int]:
mission_id: int = mission.id
objectives = self.mission_id_to_location_ids[mission_id]
for objective in objectives:
yield get_location_id(mission_id, objective)
def locations_for_mission_id(self, mission_id: int) -> typing.Iterable[int]:
objectives = self.mission_id_to_location_ids[mission_id]
for objective in objectives:
yield get_location_id(mission_id, objective)
def uncollected_locations_in_mission(self, mission: SC2Mission) -> typing.Iterable[int]:
for location_id in self.locations_for_mission(mission):
if location_id in self.missing_locations:
yield location_id
def is_mission_completed(self, mission_id: int) -> bool:
return get_location_id(mission_id, 0) in self.checked_locations
async def trade_acquire_storage(self, keep_trying: bool = False) -> typing.Optional[dict]:
# This function was largely taken from the Pokemon Emerald client
"""
Acquires a lock on the Void Trade DataStorage.
Locking the key means you have exclusive access
to modifying the value until you unlock it or the key expires (5 seconds).
If `keep_trying` is `True`, it will keep trying to acquire the lock
until successful. Otherwise it will return `None` if it fails to
acquire the lock.
"""
while not self.exit_event.is_set() and self.last_bot and self.last_bot.game_running:
lock = int(time.time_ns() / 1000000000) # in seconds
# Make sure we're not past the waiting limit
# SC2 needs to be notified within 10 minutes of game time (training time of the dummy units)
if self.trade_lock_start is not None:
if self.last_bot.time - self.trade_lock_start >= TRADE_LOCK_WAIT_LIMIT:
self.trade_lock_wait = 0
self.trade_lock_start = None
return None
elif keep_trying:
self.trade_lock_start = self.last_bot.time
message_uuid = str(uuid.uuid4())
await self.send_msgs([{
"cmd": "Set",
"key": self.trade_storage_team(),
"default": { TRADE_DATASTORAGE_LOCK: 0 },
"want_reply": True,
"operations": [{ "operation": "update", "value": { TRADE_DATASTORAGE_LOCK: lock } }],
"uuid": message_uuid,
}])
self.trade_reply_event.clear()
try:
await asyncio.wait_for(self.trade_reply_event.wait(), 5)
except asyncio.TimeoutError:
if not keep_trying:
return None
continue
assert self.trade_latest_reply is not None
reply = copy.deepcopy(self.trade_latest_reply)
# Make sure the most recently received update was triggered by our lock attempt
if reply.get("uuid", None) != message_uuid:
if not keep_trying:
return None
await asyncio.sleep(TRADE_LOCK_TIME)
continue
# Make sure the current value of the lock is what we set it to
# (I think this should theoretically never run)
if reply["value"][TRADE_DATASTORAGE_LOCK] != lock:
if not keep_trying:
return None
await asyncio.sleep(TRADE_LOCK_TIME)
continue
# Make sure that the lock value we replaced is at least 5 seconds old
# If it was unlocked before our change, its value was 0 and it will look decades old
if lock - reply["original_value"][TRADE_DATASTORAGE_LOCK] < TRADE_LOCK_TIME:
if not keep_trying:
return None
# Multiple clients trying to lock the key may get stuck in a loop of checking the lock
# by trying to set it, which will extend its expiration. So if we see that the lock was
# too new when we replaced it, we should wait for increasingly longer periods so that
# eventually the lock will expire and a client will acquire it.
self.trade_lock_wait += TRADE_LOCK_TIME
self.trade_lock_wait += random.randrange(100, 500) / 1000
await asyncio.sleep(self.trade_lock_wait)
continue
# We have the lock, reset the waiting period and return
self.trade_lock_wait = 0
self.trade_lock_start = None
return reply
return None
async def trade_receive(self, amount: int = 1):
"""
Tries to pop `amount` units out of the trade storage.
"""
reply = await self.trade_acquire_storage(True)
if reply is None:
self.trade_response = "?TradeFail Void Trade failed: Could not communicate with server. Trade cost refunded."
return None
# Find available units
# Ignore units we sent ourselves
allowed_slots: typing.List[str] = [
slot for slot in reply["value"]
if slot != TRADE_DATASTORAGE_LOCK \
and slot != self.trade_storage_slot()
]
# Filter out trades that are too old
if self.trade_age_limit != VoidTradeAgeLimit.option_disabled:
trade_time = reply["value"][TRADE_DATASTORAGE_LOCK]
allowed_age = void_trade_age_limits_ms[self.trade_age_limit]
is_young_enough = lambda send_time: trade_time - send_time <= allowed_age
else:
is_young_enough = lambda _: True
# Filter out banned units
if self.trade_workers_allowed == VoidTradeWorkers.option_false:
is_unit_allowed = lambda unit: unit not in worker_units
else:
is_unit_allowed = lambda _: True
available_units: typing.List[typing.Tuple[str, str, int]] = []
available_counts: typing.List[int] = []
for slot in allowed_slots:
for (send_time, units) in reply["value"][slot].items():
if is_young_enough(int(send_time)):
for (unit, count) in units.items():
if is_unit_allowed(str(unit)):
available_units.append((unit, slot, send_time))
available_counts.append(count)
# Pick units to receive
# If there's not enough units in total, just pick as many as possible
# SC2 should handle the refund
available = sum(available_counts)
refunds = 0
if available < amount:
refunds = amount - available
amount = available
if available == 0:
# random.sample crashes if counts is an empty list
units = []
else:
units = random.sample(available_units, amount, counts = available_counts)
# Build response data
unit_counts: typing.Dict[str, int] = {}
slots_to_update: typing.Dict[str, typing.Dict[int, typing.Dict[str, int]]] = {}
for (unit, slot, send_time) in units:
unit_counts[unit] = unit_counts.get(unit, 0) + 1
if slot not in slots_to_update:
slots_to_update[slot] = copy.deepcopy(reply["value"][slot])
slots_to_update[slot][send_time][unit] -= 1
# Clean up units that were removed completely
if slots_to_update[slot][send_time][unit] == 0:
slots_to_update[slot][send_time].pop(unit)
# Clean up trades that were completely exhausted
if len(slots_to_update[slot][send_time]) == 0:
slots_to_update[slot].pop(send_time)
await self.send_msgs([
{ # Update server storage
"cmd": "Set",
"key": self.trade_storage_team(),
"operations": [{ "operation": "update", "value": slots_to_update }]
},
{ # Release the lock
"cmd": "Set",
"key": self.trade_storage_team(),
"operations": [{ "operation": "update", "value": { TRADE_DATASTORAGE_LOCK: 0 } }]
}
])
# Give units to bot
self.trade_response = f"?Trade {refunds} " + " ".join(f"{unit} {count}" for (unit, count) in unit_counts.items())
async def trade_send(self, units: typing.List[str]):
"""
Tries to upload `units` to the trade DataStorage.
"""
reply = await self.trade_acquire_storage(True)
if reply is None:
self.trade_response = "?TradeFail Void Trade failed: Could not communicate with server. Your units remain."
return None
# Create a storage entry for the time the trade was confirmed
trade_time = reply["value"][TRADE_DATASTORAGE_LOCK]
storage_entry = {}
for unit in units:
storage_entry[unit] = storage_entry.get(unit, 0) + 1
# Update the storage with the new units
data: typing.Dict[int, typing.Dict[str, int]] = copy.deepcopy(reply["value"].get(self.trade_storage_slot(), {}))
data[trade_time] = storage_entry
await self.send_msgs([
{ # Send the updated data
"cmd": "Set",
"key": self.trade_storage_team(),
"operations": [{ "operation": "update", "value": { self.trade_storage_slot(): data } }]
},
{ # Release the lock
"cmd": "Set",
"key": self.trade_storage_team(),
"operations": [{ "operation": "update", "value": { TRADE_DATASTORAGE_LOCK: 0 } }]
}
])
# Notify the game
self.trade_response = "?TradeSuccess Void Trade successful: Units sent!"
class CompatItemHolder(typing.NamedTuple):
name: str
quantity: int = 1
async def main(args: typing.Sequence[str] | None):
multiprocessing.freeze_support()
parser = get_base_parser()
parser.add_argument('--name', default=None, help="Slot Name to connect as.")
args, uri = parser.parse_known_args(args)
if uri and uri[0].startswith('archipelago://'):
args.url = uri[0]
handle_url_arg(args, parser)
ctx = SC2Context(args.connect, args.password)
ctx.auth = args.name
if ctx.server_task is None:
ctx.server_task = asyncio.create_task(server_loop(ctx), name="ServerLoop")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
await ctx.exit_event.wait()
await ctx.shutdown()
# These items must be given to the player if the game is generated on older versions
API2_TO_API3_COMPAT_ITEMS: typing.Set[CompatItemHolder] = {
CompatItemHolder(item_names.PHOTON_CANNON),
CompatItemHolder(item_names.OBSERVER),
CompatItemHolder(item_names.WARP_HARMONIZATION),
CompatItemHolder(item_names.PROGRESSIVE_PROTOSS_WEAPON_ARMOR_UPGRADE, 3)
}
API3_TO_API4_COMPAT_ITEMS: typing.Set[CompatItemHolder] = {
# War Council
CompatItemHolder(item_names.ZEALOT_WHIRLWIND),
CompatItemHolder(item_names.CENTURION_RESOURCE_EFFICIENCY),
CompatItemHolder(item_names.SENTINEL_RESOURCE_EFFICIENCY),
CompatItemHolder(item_names.STALKER_PHASE_REACTOR),
CompatItemHolder(item_names.DRAGOON_PHALANX_SUIT),
CompatItemHolder(item_names.INSTIGATOR_MODERNIZED_SERVOS),
CompatItemHolder(item_names.ADEPT_DISRUPTIVE_TRANSFER),
CompatItemHolder(item_names.SLAYER_PHASE_BLINK),
CompatItemHolder(item_names.AVENGER_KRYHAS_CLOAK),
CompatItemHolder(item_names.DARK_TEMPLAR_LESSER_SHADOW_FURY),
CompatItemHolder(item_names.DARK_TEMPLAR_GREATER_SHADOW_FURY),
CompatItemHolder(item_names.BLOOD_HUNTER_BRUTAL_EFFICIENCY),
CompatItemHolder(item_names.SENTRY_DOUBLE_SHIELD_RECHARGE),
CompatItemHolder(item_names.ENERGIZER_MOBILE_CHRONO_BEAM),
CompatItemHolder(item_names.HAVOC_ENDURING_SIGHT),
CompatItemHolder(item_names.HIGH_TEMPLAR_PLASMA_SURGE),
CompatItemHolder(item_names.SIGNIFIER_FEEDBACK),
CompatItemHolder(item_names.ASCENDANT_BREATH_OF_CREATION),
CompatItemHolder(item_names.DARK_ARCHON_INDOMITABLE_WILL),
CompatItemHolder(item_names.IMMORTAL_IMPROVED_BARRIER),
CompatItemHolder(item_names.VANGUARD_RAPIDFIRE_CANNON),
CompatItemHolder(item_names.VANGUARD_FUSION_MORTARS),
CompatItemHolder(item_names.ANNIHILATOR_TWILIGHT_CHASSIS),
CompatItemHolder(item_names.COLOSSUS_FIRE_LANCE),
CompatItemHolder(item_names.WRATHWALKER_AERIAL_TRACKING),
CompatItemHolder(item_names.REAVER_KHALAI_REPLICATORS),
CompatItemHolder(item_names.PHOENIX_DOUBLE_GRAVITON_BEAM),
CompatItemHolder(item_names.CORSAIR_NETWORK_DISRUPTION),
CompatItemHolder(item_names.MIRAGE_GRAVITON_BEAM),
CompatItemHolder(item_names.VOID_RAY_PRISMATIC_RANGE),
CompatItemHolder(item_names.CARRIER_REPAIR_DRONES),
CompatItemHolder(item_names.TEMPEST_DISINTEGRATION),
CompatItemHolder(item_names.ARBITER_VESSEL_OF_THE_CONCLAVE),
CompatItemHolder(item_names.MOTHERSHIP_INTEGRATED_POWER),
# Other items
CompatItemHolder(item_names.ASCENDANT_ARCHON_MERGE),
CompatItemHolder(item_names.DARK_TEMPLAR_ARCHON_MERGE),
CompatItemHolder(item_names.SPORE_CRAWLER_BIO_BONUS),
}
def compat_item_to_network_items(compat_item: CompatItemHolder) -> typing.List[NetworkItem]:
item_id = get_full_item_list()[compat_item.name].code
network_item = NetworkItem(item_id, 0, 0, 0)
return compat_item.quantity * [network_item]
def calculate_items(ctx: SC2Context) -> typing.Dict[SC2Race, typing.List[int]]:
items = ctx.items_received.copy()
item_list = get_full_item_list()
def create_network_item(item_name: str) -> NetworkItem:
return NetworkItem(item_list[item_name].code, 0, 0, 0)
# Items unlocked in earlier generator versions by default (Prophecy defaults, war council, rebalances)
if ctx.slot_data_version < 3:
for compat_item in API2_TO_API3_COMPAT_ITEMS:
items.extend(compat_item_to_network_items(compat_item))
if ctx.slot_data_version < 4:
for compat_item in API3_TO_API4_COMPAT_ITEMS:
items.extend(compat_item_to_network_items(compat_item))
received_item_ids = set(item.item for item in ctx.items_received)
if item_list[item_names.GHOST_RESOURCE_EFFICIENCY].code in received_item_ids:
items.append(create_network_item(item_names.GHOST_BARGAIN_BIN_PRICES))
if item_list[item_names.SPECTRE_RESOURCE_EFFICIENCY].code in received_item_ids:
items.append(create_network_item(item_names.SPECTRE_BARGAIN_BIN_PRICES))
if item_list[item_names.ROGUE_FORCES].code in received_item_ids:
items.append(create_network_item(item_names.UNRESTRICTED_MUTATION))
if item_list[item_names.SCOUT_RESOURCE_EFFICIENCY].code in received_item_ids:
items.append(create_network_item(item_names.SCOUT_SUPPLY_EFFICIENCY))
if item_list[item_names.REAVER_RESOURCE_EFFICIENCY].code in received_item_ids:
items.append(create_network_item(item_names.REAVER_BARGAIN_BIN_PRICES))
# API < 4 Orbital Command Count (Deprecated item)
orbital_command_count: int = 0
network_item: NetworkItem
accumulators: typing.Dict[SC2Race, typing.List[int]] = {
race: [0 for element in item_type_enum_class if element.flag_word >= 0]
for race, item_type_enum_class in race_to_item_type.items()
}
# Protoss Shield grouped item specific logic
shields_from_ground_upgrade: int = 0
shields_from_air_upgrade: int = 0
for network_item in items:
name = lookup_id_to_name.get(network_item.item)
if name is None:
continue
item_data: ItemData = item_list[name]
if item_data.type.flag_word < 0:
continue
# exists exactly once
if item_data.quantity == 1 or name in item_name_groups[ItemGroupNames.UNRELEASED_ITEMS]:
accumulators[item_data.race][item_data.type.flag_word] |= 1 << item_data.number
# exists multiple times
elif item_data.quantity > 1:
flaggroup = item_data.type.flag_word
# Generic upgrades apply only to Weapon / Armor upgrades
if item_data.number >= 0:
accumulators[item_data.race][flaggroup] += 1 << item_data.number
else:
if name == item_names.PROGRESSIVE_PROTOSS_GROUND_UPGRADE:
shields_from_ground_upgrade += 1
if name == item_names.PROGRESSIVE_PROTOSS_AIR_UPGRADE:
shields_from_air_upgrade += 1
for bundled_number in get_bundle_upgrade_member_numbers(name):
accumulators[item_data.race][flaggroup] += 1 << bundled_number
# Regen bio-steel nerf with API3 - undo for older games
if ctx.slot_data_version < 3 and name == item_names.PROGRESSIVE_REGENERATIVE_BIO_STEEL:
current_level = (accumulators[item_data.race][flaggroup] >> item_data.number) % 4
if current_level == 2:
# Switch from level 2 to level 3 for compatibility
accumulators[item_data.race][flaggroup] += 1 << item_data.number
# sum
# Fillers, deprecated items
else:
if name == item_names.PROGRESSIVE_ORBITAL_COMMAND:
orbital_command_count += 1
elif item_data.type == ZergItemType.Level:
accumulators[item_data.race][item_data.type.flag_word] += item_data.number
elif name == item_names.STARTING_MINERALS:
accumulators[item_data.race][item_data.type.flag_word] += ctx.minerals_per_item
elif name == item_names.STARTING_VESPENE:
accumulators[item_data.race][item_data.type.flag_word] += ctx.vespene_per_item
elif name == item_names.STARTING_SUPPLY:
accumulators[item_data.race][item_data.type.flag_word] += ctx.starting_supply_per_item
elif name == item_names.UPGRADE_RESEARCH_COST:
accumulators[item_data.race][item_data.type.flag_word] += ctx.research_cost_reduction_per_item
else:
accumulators[item_data.race][item_data.type.flag_word] += 1
# Fix Shields from generic upgrades by unit class (Maximum of ground/air upgrades)
if shields_from_ground_upgrade > 0 or shields_from_air_upgrade > 0:
shield_upgrade_level = max(shields_from_ground_upgrade, shields_from_air_upgrade)
shield_upgrade_item = item_list[item_names.PROGRESSIVE_PROTOSS_SHIELDS]
for _ in range(0, shield_upgrade_level):
accumulators[shield_upgrade_item.race][shield_upgrade_item.type.flag_word] += 1 << shield_upgrade_item.number
# Deprecated Orbital Command handling (Backwards compatibility):
if orbital_command_count > 0:
orbital_command_replacement_items: typing.List[str] = [
item_names.COMMAND_CENTER_SCANNER_SWEEP,
item_names.COMMAND_CENTER_MULE,
item_names.COMMAND_CENTER_EXTRA_SUPPLIES,
item_names.PLANETARY_FORTRESS_ORBITAL_MODULE
]
replacement_item_ids = [get_full_item_list()[item_name].code for item_name in orbital_command_replacement_items]
if sum(item_id in replacement_item_ids for item_id in items) > 0:
logger.warning(inspect.cleandoc("""
Both old Orbital Command and its replacements are present in the world. Skipping compatibility handling.
"""))
else:
# None of replacement items are present
# L1: MULE and Scanner Sweep
scanner_sweep_data = get_full_item_list()[item_names.COMMAND_CENTER_SCANNER_SWEEP]
mule_data = get_full_item_list()[item_names.COMMAND_CENTER_MULE]
accumulators[scanner_sweep_data.race][scanner_sweep_data.type.flag_word] += 1 << scanner_sweep_data.number
accumulators[mule_data.race][mule_data.type.flag_word] += 1 << mule_data.number
if orbital_command_count >= 2:
# L2 MULE and Scanner Sweep usable even in Planetary Fortress Mode
planetary_orbital_module_data = get_full_item_list()[item_names.PLANETARY_FORTRESS_ORBITAL_MODULE]
accumulators[planetary_orbital_module_data.race][planetary_orbital_module_data.type.flag_word] += \
1 << planetary_orbital_module_data.number
# Upgrades from completed missions
if ctx.generic_upgrade_missions > 0:
total_missions = sum(len(column) for campaign in ctx.custom_mission_order for layout in campaign.layouts for column in layout.missions)
num_missions = int((ctx.generic_upgrade_missions / 100) * total_missions)
completed = len([mission_id for mission_id in ctx.mission_id_to_location_ids if ctx.is_mission_completed(mission_id)])
upgrade_count = min(completed // num_missions, ctx.max_upgrade_level) if num_missions > 0 else ctx.max_upgrade_level
upgrade_count = min(upgrade_count, WEAPON_ARMOR_UPGRADE_MAX_LEVEL)
# Equivalent to "Progressive Weapon/Armor Upgrade" item
global_upgrades: typing.Set[str] = upgrade_included_names[GenericUpgradeItems.option_bundle_all]
for global_upgrade in global_upgrades:
race = get_full_item_list()[global_upgrade].race
upgrade_flaggroup = race_to_item_type[race]["Upgrade"].flag_word
for bundled_number in get_bundle_upgrade_member_numbers(global_upgrade):
accumulators[race][upgrade_flaggroup] += upgrade_count << bundled_number
return accumulators
def get_bundle_upgrade_member_numbers(bundled_item: str) -> typing.List[int]:
upgrade_elements: typing.List[str] = upgrade_bundles[bundled_item]
if bundled_item in (item_names.PROGRESSIVE_PROTOSS_GROUND_UPGRADE, item_names.PROGRESSIVE_PROTOSS_AIR_UPGRADE):
# Shields are handled as a maximum of those two
upgrade_elements = [item_name for item_name in upgrade_elements if item_name != item_names.PROGRESSIVE_PROTOSS_SHIELDS]
return [get_full_item_list()[item_name].number for item_name in upgrade_elements]
def calc_difficulty(difficulty: int):
if difficulty == 0:
return 'C'
elif difficulty == 1:
return 'N'
elif difficulty == 2:
return 'H'
elif difficulty == 3:
return 'B'
return 'X'
def get_kerrigan_level(ctx: SC2Context, items: typing.Dict[SC2Race, typing.List[int]], missions_beaten: int) -> int:
item_value = items[SC2Race.ZERG][ZergItemType.Level.flag_word]
mission_value = missions_beaten * ctx.kerrigan_levels_per_mission_completed
if ctx.kerrigan_levels_per_mission_completed_cap != -1:
mission_value = min(mission_value, ctx.kerrigan_levels_per_mission_completed_cap)
total_value = item_value + mission_value
if ctx.kerrigan_total_level_cap != -1:
total_value = min(total_value, ctx.kerrigan_total_level_cap)
return total_value
def calculate_kerrigan_options(ctx: SC2Context) -> int:
result = 0
# Bits 0, 1
# Kerrigan unit available
if ctx.kerrigan_presence in kerrigan_unit_available:
result |= 1 << 0
# Bit 2
# Kerrigan primal status by map
if ctx.kerrigan_primal_status == KerriganPrimalStatus.option_vanilla:
result |= 1 << 2
return result
def caclulate_soa_options(ctx: SC2Context, mission: SC2Mission) -> int:
"""
Pack SOA options into a single integer with bitflags.
0b000011 = SOA presence
0b000100 = SOA in no-builds
0b011000 = Passives presence
0b100000 = PAssives in no-builds
"""
result = 0
# Bits 0, 1
# SoA Calldowns available
soa_presence_value = 0
if is_mission_in_soa_presence(ctx.spear_of_adun_presence, mission):
soa_presence_value = 3
result |= soa_presence_value << 0
# Bit 2
# SoA Calldowns for no-builds
if ctx.spear_of_adun_present_in_no_build == SpearOfAdunPresentInNoBuild.option_true:
result |= 1 << 2
# Bits 3,4
# Autocasts
soa_autocasts_presence_value = 0
if is_mission_in_soa_presence(ctx.spear_of_adun_passive_ability_presence, mission, SpearOfAdunPassiveAbilityPresence):
soa_autocasts_presence_value = 3
# Guardian Shell breaks without SoA on version 4+, but can be generated without SoA on version 3
if ctx.slot_data_version < 4 and MissionFlag.Protoss in mission.flags:
soa_autocasts_presence_value = 3
result |= soa_autocasts_presence_value << 3
# Bit 5
# Autocasts in no-builds
if ctx.spear_of_adun_passive_present_in_no_build == SpearOfAdunPassivesPresentInNoBuild.option_true:
result |= 1 << 5
return result
def calculate_generic_upgrade_options(ctx: SC2Context) -> int:
result = 0
# Bits 0,1
# Research mode
research_mode_value = 0
if ctx.generic_upgrade_research == GenericUpgradeResearch.option_vanilla:
research_mode_value = 0
elif ctx.generic_upgrade_research == GenericUpgradeResearch.option_auto_in_no_build:
research_mode_value = 1
elif ctx.generic_upgrade_research == GenericUpgradeResearch.option_auto_in_build:
research_mode_value = 2
elif ctx.generic_upgrade_research == GenericUpgradeResearch.option_always_auto:
research_mode_value = 3
result |= research_mode_value << 0
# Bit 2
# Speedup
if ctx.generic_upgrade_research_speedup == GenericUpgradeResearchSpeedup.option_true:
result |= 1 << 2
return result
def calculate_trade_options(ctx: SC2Context) -> int:
result = 0
# Bit 0
# Trade enabled
if ctx.trade_enabled:
result |= 1 << 0
# Bit 1
# Workers allowed
if ctx.trade_workers_allowed == VoidTradeWorkers.option_true:
result |= 1 << 1
return result
def kerrigan_primal(ctx: SC2Context, kerrigan_level: int) -> bool:
if ctx.kerrigan_primal_status == KerriganPrimalStatus.option_always_zerg:
return True
elif ctx.kerrigan_primal_status == KerriganPrimalStatus.option_always_human:
return False
elif ctx.kerrigan_primal_status == KerriganPrimalStatus.option_level_35:
return kerrigan_level >= 35
elif ctx.kerrigan_primal_status == KerriganPrimalStatus.option_half_completion:
total_missions = len(ctx.mission_id_to_location_ids)
completed = sum(ctx.is_mission_completed(mission_id)
for mission_id in ctx.mission_id_to_location_ids)
return completed >= (total_missions / 2)
elif ctx.kerrigan_primal_status == KerriganPrimalStatus.option_item:
codes = [item.item for item in ctx.items_received]
return get_full_item_list()[item_names.KERRIGAN_PRIMAL_FORM].code in codes
return False
def get_mission_variant(mission_id: int) -> int:
mission_flags = lookup_id_to_mission[mission_id].flags
if MissionFlag.RaceSwap not in mission_flags:
return 0
if MissionFlag.Terran in mission_flags:
return 1
elif MissionFlag.Zerg in mission_flags:
return 2
elif MissionFlag.Protoss in mission_flags:
return 3
return 0
def get_item_flag_word(item_name: str) -> int:
return get_full_item_list()[item_name].type.flag_word
async def starcraft_launch(ctx: SC2Context, mission_id: int):
sc2_logger.info(f"Launching {lookup_id_to_mission[mission_id].mission_name}. If game does not launch check log file for errors.")
with DllDirectory(None):
run_game(
bot.maps.get(lookup_id_to_mission[mission_id].map_file),
[Bot(Race.Terran, ArchipelagoBot(ctx, mission_id), name="Archipelago", fullscreen=not SC2World.settings.game_windowed_mode)],
realtime=True,
)
class ArchipelagoBot(bot.bot_ai.BotAI):
__slots__ = [
'game_running',
'mission_completed',
'boni',
'setup_done',
'ctx',
'mission_id',
'want_close',
'can_read_game',
'last_received_update',
'last_trade_cargo',
'last_supply_used'
]
ctx: SC2Context
# defined in bot_ai_internal.py; seems to be mis-annotated as a float and later re-annotated as an int
supply_used: int
def __init__(self, ctx: SC2Context, mission_id: int):
self.game_running = False
self.mission_completed = False
self.want_close = False
self.can_read_game = False
self.last_received_update: int = 0
self.last_trade_cargo: set = set()
self.last_supply_used: int = 0
self.trade_reply_cooldown: int = 0
self.setup_done = False
self.ctx = ctx
self.ctx.last_bot = self
self.mission_id = mission_id
self.boni = [False for _ in range(MAX_BONUS)]
super(ArchipelagoBot, self).__init__()
async def on_step(self, iteration: int):
if self.want_close:
self.want_close = False
await self._client.leave()
return
game_state = 0
if not self.setup_done:
self.setup_done = True
mission = lookup_id_to_mission[self.mission_id]
start_items = calculate_items(self.ctx)
missions_beaten = self.missions_beaten_count()
kerrigan_level = get_kerrigan_level(self.ctx, start_items, missions_beaten)
kerrigan_options = calculate_kerrigan_options(self.ctx)
soa_options = caclulate_soa_options(self.ctx, mission)
generic_upgrade_options = calculate_generic_upgrade_options(self.ctx)
trade_options = calculate_trade_options(self.ctx)
mission_variant = get_mission_variant(self.mission_id) # 0/1/2/3 for unchanged/Terran/Zerg/Protoss
nova_fallback: bool
if MissionFlag.Nova in mission.flags:
nova_fallback = self.ctx.use_nova_nco_fallback
elif MissionFlag.WoLNova in mission.flags:
nova_fallback = self.ctx.use_nova_wol_fallback
else:
nova_fallback = False
uncollected_objectives: typing.List[int] = self.get_uncollected_objectives()
if self.ctx.difficulty_override >= 0:
difficulty = calc_difficulty(self.ctx.difficulty_override)
else:
difficulty = calc_difficulty(self.ctx.difficulty)
if self.ctx.game_speed_override >= 0:
game_speed = self.ctx.game_speed_override
else:
game_speed = self.ctx.game_speed
await self.chat_send(
"?SetOptions"
f" {difficulty}"
f" {generic_upgrade_options}"
f" {self.ctx.all_in_choice}"
f" {game_speed}"
f" {self.ctx.disable_forced_camera}"
f" {self.ctx.skip_cutscenes}"
f" {kerrigan_options}"
f" {self.ctx.grant_story_tech}"
f" {self.ctx.take_over_ai_allies}"
f" {soa_options}"
f" {self.ctx.mission_order}"
f" {int(nova_fallback)}"
f" {self.ctx.grant_story_levels}"
f" {self.ctx.enable_morphling}"
f" {mission_variant}"
f" {trade_options}"
f" {self.ctx.difficulty_damage_modifier}"
f" {self.ctx.mercenary_highlanders}" # TODO: Possibly rework it into unit options in the next cycle
f" {self.ctx.war_council_nerfs}"
)
await self.update_resources(start_items)
await self.update_terran_tech(start_items)
await self.update_zerg_tech(start_items, kerrigan_level)
await self.update_protoss_tech(start_items)
await self.update_misc_tech(start_items)
await self.update_colors()
if uncollected_objectives:
await self.chat_send("?UncollectedLocations {}".format(
functools.reduce(lambda a, b: a + " " + b, [str(x) for x in uncollected_objectives])
))
await self.chat_send("?LoadFinished")
self.last_received_update = len(self.ctx.items_received)
else:
if self.ctx.pending_color_update:
await self.update_colors()
if not self.ctx.announcements.empty():
message = self.ctx.announcements.get(timeout=1)
await self.chat_send("?SendMessage " + message)
self.ctx.announcements.task_done()
# Archipelago reads the health
controller1_state = 0
controller2_state = 0
for unit in self.all_own_units():
if unit.health_max == CONTROLLER_HEALTH:
controller1_state = int(CONTROLLER_HEALTH - unit.health)
self.can_read_game = True
elif unit.health_max == CONTROLLER2_HEALTH:
controller2_state = int(CONTROLLER2_HEALTH - unit.health)
self.can_read_game = True
elif unit.name == TRADE_UNIT:
# Handle Void Trade requests
# Check for orders (for buildings this is usually research or training)
if not unit.is_idle and not self.ctx.trade_underway:
button = unit.orders[0].ability.button_name
if button == TRADE_SEND_BUTTON and len(self.last_trade_cargo) > 0:
units_to_send: typing.List[str] = []
non_ap_units: typing.Set[str] = set()
for passenger in self.last_trade_cargo:
# Alternatively passenger._type_data.name but passenger.name seems to always match
unit_name = passenger.name
if unit_name.startswith("AP_"):
units_to_send.append(normalized_unit_types.get(unit_name, unit_name))
else:
non_ap_units.add(unit_name)
if len(non_ap_units) > 0:
sc2_logger.info(f"Void Trade tried to send non-AP units: {', '.join(non_ap_units)}")
self.ctx.trade_response = "?TradeFail Void Trade rejected: Trade contains invalid units."
self.ctx.trade_underway = True
else:
self.ctx.trade_response = None
self.ctx.trade_underway = True
async_start(self.ctx.trade_send(units_to_send))
elif button == TRADE_RECEIVE_1_BUTTON:
self.ctx.trade_underway = True
if self.supply_used != self.last_supply_used:
self.ctx.trade_response = None
async_start(self.ctx.trade_receive(1))
else:
self.ctx.trade_response = "?TradeFail Void Trade rejected: Not enough supply."
elif button == TRADE_RECEIVE_5_BUTTON:
self.ctx.trade_underway = True
if self.supply_used != self.last_supply_used:
self.ctx.trade_response = None
async_start(self.ctx.trade_receive(5))
else:
self.ctx.trade_response = "?TradeFail Void Trade rejected: Not enough supply."
elif not unit.is_idle and self.trade_reply_cooldown > 0:
self.trade_reply_cooldown -= 1
elif unit.is_idle and self.trade_reply_cooldown > 0:
self.trade_reply_cooldown = 0
self.ctx.trade_response = None
self.ctx.trade_underway = False
else:
# The API returns no passengers for researching/training buildings,
# so we need to buffer the passengers each frame
self.last_trade_cargo = unit.passengers
# SC2 has no good means of detecting when a unit is queued while supply capped,
# so a supply buffer here is the best we can do
self.last_supply_used = self.supply_used
game_state = controller1_state + (controller2_state << 15)
if iteration == 160 and not game_state & 1:
await self.chat_send("?SendMessage Warning: Archipelago unable to connect or has lost connection to " +
"Starcraft 2 (This is likely a map issue)")
if self.last_received_update < len(self.ctx.items_received):
current_items = calculate_items(self.ctx)
missions_beaten = self.missions_beaten_count()
kerrigan_level = get_kerrigan_level(self.ctx, current_items, missions_beaten)
await self.update_resources(current_items)
await self.update_terran_tech(current_items)
await self.update_zerg_tech(current_items, kerrigan_level)
await self.update_protoss_tech(current_items)
await self.update_misc_tech(current_items)
self.last_received_update = len(self.ctx.items_received)
if game_state & 1:
if not self.game_running:
print("Archipelago Connected")
self.game_running = True
if self.can_read_game:
if game_state & (1 << 1) and not self.mission_completed:
victory_locations = [get_location_id(self.mission_id, 0)]
send_victory = (
self.mission_id in self.ctx.final_mission_ids and
len(self.ctx.final_locations) == len(self.ctx.checked_locations.union(victory_locations).intersection(self.ctx.final_locations))
)
# Old slots don't have locations on goal
if not send_victory or self.ctx.slot_data_version >= 4:
sc2_logger.info("Mission Completed")
location_ids = self.ctx.mission_id_to_location_ids[self.mission_id]
victory_locations += sorted([
get_location_id(self.mission_id, location_id)
for location_id in location_ids
if (location_id % VICTORY_MODULO) >= VICTORY_CACHE_OFFSET
])
await self.ctx.send_msgs(
[{"cmd": 'LocationChecks',
"locations": victory_locations}])
self.mission_completed = True
if send_victory:
print("Game Complete")
await self.ctx.send_msgs([{"cmd": 'StatusUpdate', "status": ClientStatus.CLIENT_GOAL}])
self.mission_completed = True
self.ctx.finished_game = True
for x, completed in enumerate(self.boni):
if not completed and game_state & (1 << (x + 2)):
await self.ctx.send_msgs(
[{"cmd": 'LocationChecks',
"locations": [get_location_id(self.mission_id, x + 1)]}])
self.boni[x] = True
# Send Void Trade results
if self.ctx.trade_response is not None and self.trade_reply_cooldown == 0:
await self.chat_send(self.ctx.trade_response)
# Wait an arbitrary amount of frames before trying again
self.trade_reply_cooldown = 60
else:
await self.chat_send("?SendMessage LostConnection - Lost connection to game.")
def get_uncollected_objectives(self) -> typing.List[int]:
result = [
location % VICTORY_MODULO
for location in self.ctx.uncollected_locations_in_mission(lookup_id_to_mission[self.mission_id])
if (location % VICTORY_MODULO) < VICTORY_CACHE_OFFSET
]
return result
def missions_beaten_count(self) -> int:
return len([location for location in self.ctx.checked_locations if location % VICTORY_MODULO == 0])
async def update_colors(self):
await self.chat_send("?SetColor rr " + str(self.ctx.player_color_raynor))
await self.chat_send("?SetColor ks " + str(self.ctx.player_color_zerg))
await self.chat_send("?SetColor pz " + str(self.ctx.player_color_zerg_primal))
await self.chat_send("?SetColor da " + str(self.ctx.player_color_protoss))
await self.chat_send("?SetColor nova " + str(self.ctx.player_color_nova))
self.ctx.pending_color_update = False
async def update_resources(self, current_items: typing.Dict[SC2Race, typing.List[int]]):
DEFAULT_MAX_SUPPLY = 200
max_supply_amount = max(
DEFAULT_MAX_SUPPLY
+ (
current_items[SC2Race.ANY][get_item_flag_word(item_names.MAX_SUPPLY)]
* self.ctx.maximum_supply_per_item
)
- (
current_items[SC2Race.ANY][get_item_flag_word(item_names.REDUCED_MAX_SUPPLY)]
* self.ctx.maximum_supply_reduction_per_item
),
self.ctx.lowest_maximum_supply,
)
await self.chat_send("?GiveResources {} {} {} {}".format(
current_items[SC2Race.ANY][get_item_flag_word(item_names.STARTING_MINERALS)],
current_items[SC2Race.ANY][get_item_flag_word(item_names.STARTING_VESPENE)],
current_items[SC2Race.ANY][get_item_flag_word(item_names.STARTING_SUPPLY)],
max_supply_amount - DEFAULT_MAX_SUPPLY,
))
async def update_terran_tech(self, current_items: typing.Dict[SC2Race, typing.List[int]]):
terran_items = current_items[SC2Race.TERRAN]
await self.chat_send("?GiveTerranTech " + " ".join(map(str, terran_items)))
async def update_zerg_tech(self, current_items: typing.Dict[SC2Race, typing.List[int]], kerrigan_level: int):
zerg_items = current_items[SC2Race.ZERG]
zerg_items = [value for index, value in enumerate(zerg_items) if index not in [ZergItemType.Level.flag_word, ZergItemType.Primal_Form.flag_word]]
kerrigan_primal_by_items = kerrigan_primal(self.ctx, kerrigan_level)
kerrigan_primal_bot_value = 1 if kerrigan_primal_by_items else 0
await self.chat_send(f"?GiveZergTech {kerrigan_level} {kerrigan_primal_bot_value} " + ' '.join(map(str, zerg_items)))
async def update_protoss_tech(self, current_items: typing.Dict[SC2Race, typing.List[int]]):
protoss_items = current_items[SC2Race.PROTOSS]
await self.chat_send("?GiveProtossTech " + " ".join(map(str, protoss_items)))
async def update_misc_tech(self, current_items: typing.Dict[SC2Race, typing.List[int]]):
await self.chat_send("?GiveMiscTech {} {} {}".format(
current_items[SC2Race.ANY][get_item_flag_word(item_names.BUILDING_CONSTRUCTION_SPEED)],
current_items[SC2Race.ANY][get_item_flag_word(item_names.UPGRADE_RESEARCH_SPEED)],
current_items[SC2Race.ANY][get_item_flag_word(item_names.UPGRADE_RESEARCH_COST)],
))
def calc_unfinished_nodes(
ctx: SC2Context
) -> typing.Tuple[typing.List[int], typing.Dict[int, typing.List[int]], typing.List[int], typing.Set[int]]:
unfinished_missions: typing.Set[int] = set()
available_missions, available_layouts, available_campaigns = calc_available_nodes(ctx)
for mission_id in available_missions:
objectives = set(ctx.locations_for_mission_id(mission_id))
if objectives:
objectives_completed = ctx.checked_locations & objectives
if len(objectives_completed) < len(objectives):
unfinished_missions.add(mission_id)
return available_missions, available_layouts, available_campaigns, unfinished_missions
def is_mission_available(ctx: SC2Context, mission_id_to_check: int) -> bool:
available_missions, _, _ = calc_available_nodes(ctx)
return mission_id_to_check in available_missions
def calc_available_nodes(ctx: SC2Context) -> typing.Tuple[typing.List[int], typing.Dict[int, typing.List[int]], typing.List[int]]:
beaten_missions: typing.Set[int] = {mission_id for mission_id in ctx.mission_id_to_entry_rules if ctx.is_mission_completed(mission_id)}
received_items = compute_received_items(ctx)
mission_order_objects: typing.List[MissionOrderObjectSlotData] = []
parent_objects: typing.List[typing.List[MissionOrderObjectSlotData]] = []
for campaign in ctx.custom_mission_order:
mission_order_objects.append(campaign)
parent_objects.append([])
for layout in campaign.layouts:
mission_order_objects.append(layout)
parent_objects.append([campaign])
for column in layout.missions:
for mission in column:
if mission.mission_id == -1:
continue
mission_order_objects.append(mission)
parent_objects.append([campaign, layout])
candidate_accessible_objects: typing.List[MissionOrderObjectSlotData] = [
mission_order_object for mission_order_object in mission_order_objects
if mission_order_object.entry_rule.is_accessible(beaten_missions, received_items)
]
accessible_objects: typing.List[MissionOrderObjectSlotData] = []
while len(candidate_accessible_objects) > 0:
accessible_missions: typing.List[MissionSlotData] = [mission_order_object for mission_order_object in accessible_objects if isinstance(mission_order_object, MissionSlotData)]
beaten_accessible_missions: typing.Set[int] = {mission.mission_id for mission in accessible_missions if mission.mission_id in beaten_missions}
accessible_objects_to_add: typing.List[MissionOrderObjectSlotData] = []
for mission_order_object in candidate_accessible_objects:
if (
mission_order_object.entry_rule.is_accessible(beaten_accessible_missions, received_items)
and all([
parent_object.entry_rule.is_accessible(beaten_accessible_missions, received_items)
for parent_object in parent_objects[mission_order_objects.index(mission_order_object)]
])
):
accessible_objects_to_add.append(mission_order_object)
if len(accessible_objects_to_add) > 0:
accessible_objects.extend(accessible_objects_to_add)
candidate_accessible_objects = [
mission_order_object for mission_order_object in candidate_accessible_objects
if mission_order_object not in accessible_objects_to_add
]
else:
break
accessible_missions: typing.List[MissionSlotData] = [mission_order_object for mission_order_object in accessible_objects if isinstance(mission_order_object, MissionSlotData)]
beaten_accessible_missions: typing.Set[int] = {mission.mission_id for mission in accessible_missions if mission.mission_id in beaten_missions}
for mission_order_object in mission_order_objects:
# re-generate tooltip accessibility
for sub_rule in mission_order_object.entry_rule.sub_rules:
sub_rule.was_accessible = False
mission_order_object.entry_rule.is_accessible(beaten_accessible_missions, received_items)
available_missions: typing.List[int] = [
mission_order_object.mission_id for mission_order_object in accessible_objects
if isinstance(mission_order_object, MissionSlotData)
]
available_campaign_objects: typing.List[CampaignSlotData] = [
mission_order_object for mission_order_object in accessible_objects
if isinstance(mission_order_object, CampaignSlotData)
]
available_campaigns: typing.List[int] = [
campaign_idx for campaign_idx, campaign in enumerate(ctx.custom_mission_order)
if campaign in available_campaign_objects
]
available_layout_objects: typing.List[LayoutSlotData] = [
mission_order_object for mission_order_object in accessible_objects
if isinstance(mission_order_object, LayoutSlotData)
]
available_layouts: typing.Dict[int, typing.List[int]] = {
campaign_idx: [
layout_idx for layout_idx, layout in enumerate(campaign.layouts) if layout in available_layout_objects
]
for campaign_idx, campaign in enumerate(ctx.custom_mission_order)
}
return available_missions, available_layouts, available_campaigns
def compute_received_items(ctx: SC2Context) -> typing.Counter[int]:
received_items: typing.Counter[int] = collections.Counter()
for network_item in ctx.items_received:
received_items[network_item.item] += 1
return received_items
def check_game_install_path() -> bool:
# First thing: go to the default location for ExecuteInfo.
# An exception for Windows is included because it's very difficult to find ~\Documents if the user moved it.
if is_windows:
# The next five lines of utterly inscrutable code are brought to you by copy-paste from Stack Overflow.
# https://stackoverflow.com/questions/6227590/finding-the-users-my-documents-path/30924555#
import ctypes.wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes.create_unicode_buffer(ctypes.wintypes.MAX_PATH)
ctypes.windll.shell32.SHGetFolderPathW(None, CSIDL_PERSONAL, None, SHGFP_TYPE_CURRENT, buf)
documentspath: str = buf.value
einfo = str(documentspath / Path("StarCraft II\\ExecuteInfo.txt"))
else:
einfo = str(bot.paths.get_home() / Path(bot.paths.USERPATH[bot.paths.PF]))
# Check if the file exists.
if os.path.isfile(einfo):
# Open the file and read it, picking out the latest executable's path.
with open(einfo) as f:
content = f.read()
if content:
search_result = re.search(r" = (.*)Versions", content)
if not search_result:
sc2_logger.warning(f"Found {einfo}, but it was empty. Run SC2 through the Blizzard launcher, "
"then try again.")
return False
base = search_result.group(1)
if os.path.exists(base):
executable = bot.paths.latest_executeble(Path(base).expanduser() / "Versions")
# Finally, check the path for an actual executable.
# If we find one, great. Set up the SC2PATH.
if os.path.isfile(executable):
sc2_logger.info(f"Found an SC2 install at {base}!")
sc2_logger.debug(f"Latest executable at {executable}.")
os.environ["SC2PATH"] = base
sc2_logger.debug(f"SC2PATH set to {base}.")
return True
else:
sc2_logger.warning(f"We may have found an SC2 install at {base}, but couldn't find {executable}.")
else:
sc2_logger.warning(f"{einfo} pointed to {base}, but we could not find an SC2 install there.")
else:
sc2_logger.warning(f"Couldn't find {einfo}. Run SC2 through the Blizzard launcher, then try again. "
f"If that fails, please run /set_path with your SC2 install directory.")
return False
def is_mod_installed_correctly() -> bool:
"""Searches for all required files."""
if "SC2PATH" not in os.environ:
check_game_install_path()
sc2_path: str = os.environ["SC2PATH"]
mapdir = sc2_path / Path('Maps/ArchipelagoCampaign')
mods = ["ArchipelagoCore", "ArchipelagoPlayer", "ArchipelagoPlayerSuper", "ArchipelagoPatches",
"ArchipelagoTriggers", "ArchipelagoPlayerWoL", "ArchipelagoPlayerHotS",
"ArchipelagoPlayerLotV", "ArchipelagoPlayerLotVPrologue", "ArchipelagoPlayerNCO"]
modfiles = [sc2_path / Path("Mods/" + mod + ".SC2Mod") for mod in mods]
wol_required_maps: typing.List[str] = ["WoL" + os.sep + mission.map_file + ".SC2Map" for mission in SC2Mission
if mission.campaign in (SC2Campaign.WOL, SC2Campaign.PROPHECY)]
hots_required_maps: typing.List[str] = ["HotS" + os.sep + mission.map_file + ".SC2Map" for mission in campaign_mission_table[SC2Campaign.HOTS]]
lotv_required_maps: typing.List[str] = ["LotV" + os.sep + mission.map_file + ".SC2Map" for mission in SC2Mission
if mission.campaign in (SC2Campaign.LOTV, SC2Campaign.PROLOGUE, SC2Campaign.EPILOGUE)]
nco_required_maps: typing.List[str] = ["NCO" + os.sep + mission.map_file + ".SC2Map" for mission in campaign_mission_table[SC2Campaign.NCO]]
required_maps = wol_required_maps + hots_required_maps + lotv_required_maps + nco_required_maps
needs_files = False
# Check for maps.
missing_maps: typing.List[str] = []
for mapfile in required_maps:
if not os.path.isfile(mapdir / mapfile):
missing_maps.append(mapfile)
if len(missing_maps) >= 19:
sc2_logger.warning(f"All map files missing from {mapdir}.")
needs_files = True
elif len(missing_maps) > 0:
for map in missing_maps:
sc2_logger.debug(f"Missing {map} from {mapdir}.")
sc2_logger.warning(f"Missing {len(missing_maps)} map files.")
needs_files = True
else: # Must be no maps missing
sc2_logger.debug(f"All maps found in {mapdir}.")
# Check for mods.
for modfile in modfiles:
if os.path.isfile(modfile) or os.path.isdir(modfile):
sc2_logger.debug(f"Archipelago mod found at {modfile}.")
else:
sc2_logger.warning(f"Archipelago mod could not be found at {modfile}.")
needs_files = True
# Final verdict.
if needs_files:
sc2_logger.warning("Required files are missing. Run /download_data to acquire them.")
return False
else:
sc2_logger.debug("All map/mod files are properly installed.")
return True
class DllDirectory:
# Credit to Black Sliver for this code.
# More info: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setdlldirectoryw
_old: typing.Optional[str] = None
_new: typing.Optional[str] = None
def __init__(self, new: typing.Optional[str]):
self._new = new
def __enter__(self):
old = self.get()
if self.set(self._new):
self._old = old
def __exit__(self, *args):
if self._old is not None:
self.set(self._old)
@staticmethod
def get() -> typing.Optional[str]:
if sys.platform == "win32":
n = ctypes.windll.kernel32.GetDllDirectoryW(0, None)
buf = ctypes.create_unicode_buffer(n)
ctypes.windll.kernel32.GetDllDirectoryW(n, buf)
return buf.value
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
return None
@staticmethod
def set(s: typing.Optional[str]) -> bool:
if sys.platform == "win32":
return ctypes.windll.kernel32.SetDllDirectoryW(s) != 0
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
return False
def download_latest_release_zip(
owner: str,
repo: str,
api_version: str,
metadata: typing.Optional[str] = None,
force_download=False
) -> typing.Tuple[str, typing.Optional[str]]:
"""Downloads the latest release of a GitHub repo to the current directory as a .zip file."""
import requests
headers = {"Accept": 'application/vnd.github.v3+json'}
url = f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{api_version}"
try:
r1 = requests.get(url, headers=headers)
if r1.status_code == 200:
latest_metadata = r1.json()
cleanup_downloaded_metadata(latest_metadata)
latest_metadata = str(latest_metadata)
# sc2_logger.info(f"Latest version: {latest_metadata}.")
else:
sc2_logger.warning(f"Status code: {r1.status_code}")
sc2_logger.warning("Failed to reach GitHub. Could not find download link.")
sc2_logger.warning(f"text: {r1.text}")
return "", metadata
if (force_download is False) and (metadata == latest_metadata):
sc2_logger.info("Latest version already installed.")
return "", metadata
sc2_logger.info(f"Attempting to download latest version of API version {api_version} of {repo}.")
download_url = r1.json()["assets"][0]["browser_download_url"]
r2 = requests.get(download_url, headers=headers)
if r2.status_code == 200 and zipfile.is_zipfile(io.BytesIO(r2.content)):
tempdir = tempfile.gettempdir()
file = tempdir + os.sep + f"{repo}.zip"
with open(file, "wb") as fh:
fh.write(r2.content)
sc2_logger.info(f"Successfully downloaded {repo}.zip. Installing...")
return file, latest_metadata
else:
sc2_logger.warning(f"Status code: {r2.status_code}")
sc2_logger.warning("Download failed.")
sc2_logger.warning(f"text: {r2.text}")
return "", metadata
except requests.ConnectionError:
sc2_logger.warning("Failed to reach GitHub. Could not find download link.")
return "", metadata
def cleanup_downloaded_metadata(medatada_json: dict) -> None:
for asset in medatada_json['assets']:
del asset['download_count']
def is_mod_update_available(owner: str, repo: str, api_version: str, metadata: str) -> bool:
import requests
headers = {"Accept": 'application/vnd.github.v3+json'}
url = f"https://api.github.com/repos/{owner}/{repo}/releases/tags/{api_version}"
try:
r1 = requests.get(url, headers=headers)
if r1.status_code == 200:
latest_metadata = r1.json()
cleanup_downloaded_metadata(latest_metadata)
latest_metadata = str(latest_metadata)
if metadata != latest_metadata:
return True
else:
return False
else:
sc2_logger.warning("Failed to reach GitHub while checking for updates.")
sc2_logger.warning(f"Status code: {r1.status_code}")
sc2_logger.warning(f"text: {r1.text}")
return False
except requests.ConnectionError:
sc2_logger.warning("Failed to reach GitHub while checking for updates.")
return False
def get_location_offset(mission_id: int) -> int:
return SC2WOL_LOC_ID_OFFSET if mission_id <= SC2Mission.ALL_IN.id \
else (SC2HOTS_LOC_ID_OFFSET - SC2Mission.ALL_IN.id * VICTORY_MODULO)
def get_location_id(mission_id: int, objective_id: int) -> int:
return get_location_offset(mission_id) + mission_id * VICTORY_MODULO + objective_id
_has_forced_save = False
def force_settings_save_on_close() -> None:
"""
Settings has an existing auto-save feature, but it only triggers if a new key was introduced.
Force it to mark things as changed by introducing a new key and then cleaning up.
"""
global _has_forced_save
if _has_forced_save:
return
SC2World.settings.update({'invalid_attribute': True})
del SC2World.settings.invalid_attribute
_has_forced_save = True
def launch(*args: str):
colorama.just_fix_windows_console()
asyncio.run(main(args))
colorama.deinit()