2025-09-02 17:40:58 +02:00
from __future__ import annotations
import asyncio
import collections
import copy
import ctypes
import enum
import functools
import inspect
import logging
import multiprocessing
import os . path
import re
import sys
import tempfile
import typing
import queue
import zipfile
import io
import random
import concurrent . futures
import time
import uuid
2025-09-30 09:34:26 -07:00
import argparse
2025-09-02 17:40:58 +02:00
from pathlib import Path
# CommonClient import first to trigger ModuleUpdater
2025-09-30 09:34:26 -07:00
from CommonClient import CommonContext , server_loop , ClientCommandProcessor , gui_enabled , get_base_parser , handle_url_arg
2025-09-02 17:40:58 +02:00
from Utils import init_logging , is_windows , async_start
from . item import item_names , item_parents , race_to_item_type
from . item . item_annotations import ITEM_NAME_ANNOTATIONS
from . item . item_groups import item_name_groups , unlisted_item_name_groups , ItemGroupNames
from . import options , VICTORY_MODULO
from . options import (
MissionOrder , KerriganPrimalStatus , kerrigan_unit_available , KerriganPresence , EnableMorphling , GameDifficulty ,
GameSpeed , GenericUpgradeItems , GenericUpgradeResearch , ColorChoice , GenericUpgradeMissions , MaxUpgradeLevel ,
LocationInclusion , ExtraLocations , MasteryLocations , SpeedrunLocations , PreventativeLocations , ChallengeLocations ,
VanillaLocations ,
DisableForcedCamera , SkipCutscenes , GrantStoryTech , GrantStoryLevels , TakeOverAIAllies , RequiredTactics ,
SpearOfAdunPresence , SpearOfAdunPresentInNoBuild , SpearOfAdunPassiveAbilityPresence ,
SpearOfAdunPassivesPresentInNoBuild , EnableVoidTrade , VoidTradeAgeLimit , void_trade_age_limits_ms , VoidTradeWorkers ,
DifficultyDamageModifier , MissionOrderScouting , GenericUpgradeResearchSpeedup , MercenaryHighlanders , WarCouncilNerfs ,
is_mission_in_soa_presence ,
)
from . mission_order . slot_data import CampaignSlotData , LayoutSlotData , MissionSlotData , MissionOrderObjectSlotData
from . mission_order . entry_rules import SubRuleRuleData , CountMissionsRuleData , MissionEntryRules
from . mission_tables import MissionFlag
from . transfer_data import normalized_unit_types , worker_units
from . import SC2World
if __name__ == " __main__ " :
init_logging ( " SC2Client " , exception_logger = " Client " )
logger = logging . getLogger ( " Client " )
sc2_logger = logging . getLogger ( " Starcraft2 " )
import nest_asyncio
from worlds . _sc2common import bot
from worlds . _sc2common . bot . data import Race
from worlds . _sc2common . bot . main import run_game
from worlds . _sc2common . bot . player import Bot
from . item . item_tables import (
lookup_id_to_name , get_full_item_list , ItemData ,
ZergItemType , upgrade_bundles ,
WEAPON_ARMOR_UPGRADE_MAX_LEVEL ,
)
from . locations import SC2WOL_LOC_ID_OFFSET , LocationType , LocationFlag , SC2HOTS_LOC_ID_OFFSET , VICTORY_CACHE_OFFSET
from . mission_tables import (
lookup_id_to_mission , SC2Campaign , MissionInfo ,
lookup_id_to_campaign , SC2Mission , campaign_mission_table , SC2Race
)
import colorama
from . options import Option , upgrade_included_names
from NetUtils import ClientStatus , NetworkItem , JSONtoTextParser , JSONMessagePart , add_json_item , add_json_location , add_json_text , JSONTypes
from MultiServer import mark_raw
pool = concurrent . futures . ThreadPoolExecutor ( 1 )
loop = asyncio . get_event_loop_policy ( ) . new_event_loop ( )
nest_asyncio . apply ( loop )
MAX_BONUS : int = 28
# GitHub repo where the Map/mod data is hosted for /download_data command
DATA_REPO_OWNER = " Ziktofel "
DATA_REPO_NAME = " Archipelago-SC2-data "
DATA_API_VERSION = " API4 "
# Bot controller
CONTROLLER_HEALTH : int = 38281
CONTROLLER2_HEALTH : int = 38282
# Void Trade
TRADE_UNIT = " AP_TradeStructure " # ID of the unit
TRADE_SEND_BUTTON = " AP_TradeStructureDummySend " # ID of the button
TRADE_RECEIVE_1_BUTTON = " AP_TradeStructureDummyReceive " # ID of the button
TRADE_RECEIVE_5_BUTTON = " AP_TradeStructureDummyReceive5 " # ID of the button
TRADE_DATASTORAGE_TEAM = " SC2_VoidTrade_ " # + Team
TRADE_DATASTORAGE_SLOT = " slot_ " # + Slot
TRADE_DATASTORAGE_LOCK = " _lock "
TRADE_LOCK_TIME = 5 # Time in seconds that the DataStorage may be considered safe to edit
TRADE_LOCK_WAIT_LIMIT = 540000 / 1.4 # Time in ms that the client may spend trying to get a lock (540000 = 9 minutes, 1.4 is 'faster' game speed's time scale)
# Games
STARCRAFT2 = " Starcraft 2 "
STARCRAFT2_WOL = " Starcraft 2 Wings of Liberty "
# Data version file path.
# This file is used to tell if the downloaded data are outdated
# Associated with /download_data command
def get_metadata_file ( ) - > str :
return os . environ [ " SC2PATH " ] + os . sep + " ArchipelagoSC2Metadata.txt "
def _remap_color_option ( slot_data_version : int , color : int ) - > int :
""" Remap colour options for backwards compatibility with older slot data """
if slot_data_version < 4 and color == ColorChoice . option_mengsk :
return ColorChoice . option_default
return color
class ConfigurableOptionType ( enum . Enum ) :
INTEGER = enum . auto ( )
ENUM = enum . auto ( )
class ConfigurableOptionInfo ( typing . NamedTuple ) :
name : str
variable_name : str
option_class : typing . Type [ Option ]
option_type : ConfigurableOptionType = ConfigurableOptionType . ENUM
can_break_logic : bool = False
class ColouredMessage :
def __init__ ( self , text : str = ' ' , * , keep_markup : bool = False ) - > None :
self . parts : typing . List [ dict ] = [ ]
if text :
self ( text , keep_markup = keep_markup )
def __call__ ( self , text : str , * , keep_markup : bool = False ) - > ' ColouredMessage ' :
add_json_text ( self . parts , text , keep_markup = keep_markup )
return self
def coloured ( self , text : str , colour : str , * , keep_markup : bool = False ) - > ' ColouredMessage ' :
add_json_text ( self . parts , text , type = " color " , color = colour , keep_markup = keep_markup )
return self
def location ( self , location_id : int , player_id : int ) - > ' ColouredMessage ' :
add_json_location ( self . parts , location_id , player_id )
return self
def item ( self , item_id : int , player_id : int , flags : int = 0 ) - > ' ColouredMessage ' :
add_json_item ( self . parts , item_id , player_id , flags )
return self
def player ( self , player_id : int ) - > ' ColouredMessage ' :
add_json_text ( self . parts , str ( player_id ) , type = JSONTypes . player_id )
return self
def send ( self , ctx : SC2Context ) - > None :
ctx . on_print_json ( { " data " : self . parts , " cmd " : " PrintJSON " } )
class StarcraftClientProcessor ( ClientCommandProcessor ) :
ctx : SC2Context
def formatted_print ( self , text : str ) - > None :
""" Prints with kivy formatting to the GUI, and also prints to command-line and to all logs """
# Note(mm): Bold/underline can help readability, but unfortunately the CommonClient does not filter bold tags from command-line output.
# Regardless, using `on_print_json` to get formatted text in the GUI and output in the command-line and in the logs,
# without having to branch code from CommonClient
self . ctx . on_print_json ( { " data " : [ { " text " : text , " keep_markup " : True } ] } )
def _cmd_difficulty ( self , difficulty : str = " " ) - > bool :
""" Overrides the current difficulty set for the world. Takes the argument casual, normal, hard, or brutal """
arguments = difficulty . split ( )
num_arguments = len ( arguments )
if num_arguments > 0 :
difficulty_choice = arguments [ 0 ] . lower ( )
if difficulty_choice == " casual " :
self . ctx . difficulty_override = 0
elif difficulty_choice == " normal " :
self . ctx . difficulty_override = 1
elif difficulty_choice == " hard " :
self . ctx . difficulty_override = 2
elif difficulty_choice == " brutal " :
self . ctx . difficulty_override = 3
else :
self . output ( " Unable to parse difficulty ' " + arguments [ 0 ] + " ' " )
return False
self . output ( " Difficulty set to " + arguments [ 0 ] )
return True
else :
if self . ctx . difficulty == - 1 :
self . output ( " Please connect to a seed before checking difficulty. " )
else :
current_difficulty = self . ctx . difficulty
if self . ctx . difficulty_override > = 0 :
current_difficulty = self . ctx . difficulty_override
self . output ( " Current difficulty: " + [ " Casual " , " Normal " , " Hard " , " Brutal " ] [ current_difficulty ] )
self . output ( " To change the difficulty, add the name of the difficulty after the command. " )
return False
def _cmd_game_speed ( self , game_speed : str = " " ) - > bool :
""" Overrides the current game speed for the world.
Takes the arguments default , slower , slow , normal , fast , faster """
arguments = game_speed . split ( )
num_arguments = len ( arguments )
if num_arguments > 0 :
speed_choice = arguments [ 0 ] . lower ( )
if speed_choice == " default " :
self . ctx . game_speed_override = 0
elif speed_choice == " slower " :
self . ctx . game_speed_override = 1
elif speed_choice == " slow " :
self . ctx . game_speed_override = 2
elif speed_choice == " normal " :
self . ctx . game_speed_override = 3
elif speed_choice == " fast " :
self . ctx . game_speed_override = 4
elif speed_choice == " faster " :
self . ctx . game_speed_override = 5
else :
self . output ( " Unable to parse game speed ' " + arguments [ 0 ] + " ' " )
return False
self . output ( " Game speed set to " + arguments [ 0 ] )
return True
else :
if self . ctx . game_speed == - 1 :
self . output ( " Please connect to a seed before checking game speed. " )
else :
current_speed = self . ctx . game_speed
if self . ctx . game_speed_override > = 0 :
current_speed = self . ctx . game_speed_override
self . output ( " Current game speed: "
+ [ " Default " , " Slower " , " Slow " , " Normal " , " Fast " , " Faster " ] [ current_speed ] )
self . output ( " To change the game speed, add the name of the speed after the command, "
" or Default to select based on difficulty. " )
return False
@mark_raw
def _cmd_received ( self , filter_search : str = " " ) - > bool :
""" List received items.
Pass in a parameter to filter the search by partial item name or exact item group .
Use ' /received recent <number> ' to list the last ' number ' items received ( default 20 ) . """
if self . ctx . slot is None :
self . formatted_print ( " Connect to a slot to view what items are received. " )
return True
if filter_search . casefold ( ) . startswith ( ' recent ' ) :
return self . _received_recent ( filter_search [ len ( ' recent ' ) : ] . strip ( ) )
# Groups must be matched case-sensitively, so we properly capitalize the search term
# eg. "Spear of Adun" over "Spear Of Adun" or "spear of adun"
# This fails a lot of item name matches, but those should be found by partial name match
group_filter = ' '
for group_name in item_name_groups :
if group_name in unlisted_item_name_groups :
continue
if filter_search . casefold ( ) == group_name . casefold ( ) :
group_filter = group_name
break
def item_matches_filter ( item_name : str ) - > bool :
# The filter can be an exact group name or a partial item name
# Partial item name can be matched case-insensitively
if filter_search . casefold ( ) in item_name . casefold ( ) :
return True
# The search term should already be formatted as a group name
if group_filter and item_name in item_name_groups [ group_filter ] :
return True
return False
items = get_full_item_list ( )
categorized_items : typing . Dict [ SC2Race , typing . List [ typing . Union [ int , str ] ] ] = { }
parent_to_child : typing . Dict [ typing . Union [ int , str ] , typing . List [ int ] ] = { }
items_received : typing . Dict [ int , typing . List [ NetworkItem ] ] = { }
for item in self . ctx . items_received :
items_received . setdefault ( item . item , [ ] ) . append ( item )
items_received_set = set ( items_received )
for item_data in items . values ( ) :
if item_data . parent :
parent_rule = item_parents . parent_present [ item_data . parent ]
if parent_rule . constraint_group is not None and parent_rule . constraint_group in items :
parent_to_child . setdefault ( items [ parent_rule . constraint_group ] . code , [ ] ) . append ( item_data . code )
continue
race = items [ parent_rule . parent_items ( ) [ 0 ] ] . race
categorized_items . setdefault ( race , [ ] )
if parent_rule . display_string not in categorized_items [ race ] :
categorized_items [ race ] . append ( parent_rule . display_string )
parent_to_child . setdefault ( parent_rule . display_string , [ ] ) . append ( item_data . code )
else :
categorized_items . setdefault ( item_data . race , [ ] ) . append ( item_data . code )
def display_info ( element : typing . Union [ SC2Race , str , int ] ) - > tuple :
""" Return (should display, name, type, children, sum(obtained), sum(matching filter)) """
have_item = isinstance ( element , int ) and element in items_received_set
if isinstance ( element , SC2Race ) :
children : typing . Sequence [ typing . Union [ str , int ] ] = categorized_items [ faction ]
name = element . name
elif isinstance ( element , int ) :
children = parent_to_child . get ( element , [ ] )
name = self . ctx . item_names . lookup_in_game ( element )
else :
assert isinstance ( element , str )
children = parent_to_child [ element ]
name = element
matches_filter = item_matches_filter ( name )
child_states = [ display_info ( child ) for child in children ]
return (
( have_item and matches_filter ) or any ( child_state [ 0 ] for child_state in child_states ) ,
name ,
element ,
child_states ,
sum ( child_state [ 4 ] for child_state in child_states ) + have_item ,
sum ( child_state [ 5 ] for child_state in child_states ) + ( have_item and matches_filter ) ,
)
def display_tree (
should_display : bool , name : str , element : typing . Union [ SC2Race , str , int ] , child_states : tuple , indent : int = 0
) - > None :
if not should_display :
return
assert self . ctx . slot is not None
indent_str = " " * indent
if isinstance ( element , SC2Race ) :
self . formatted_print ( f " [u] { name } [/u] " )
for child in child_states :
display_tree ( * child [ : 4 ] )
elif isinstance ( element , str ) :
ColouredMessage ( indent_str ) ( " - " ) . coloured ( name , " white " ) . send ( self . ctx )
for child in child_states :
display_tree ( * child [ : 4 ] , indent = indent + 2 )
elif isinstance ( element , int ) :
items = items_received . get ( element , [ ] )
if not items :
ColouredMessage ( indent_str ) ( " - " ) . coloured ( name , " red " ) ( " - not obtained " ) . send ( self . ctx )
for item in items :
( ColouredMessage ( indent_str ) ( ' - ' )
. item ( item . item , self . ctx . slot , flags = item . flags )
( " from " ) . location ( item . location , item . player )
( " by " ) . player ( item . player )
) . send ( self . ctx )
for child in child_states :
display_tree ( * child [ : 4 ] , indent = indent + 2 )
non_matching_descendents = sum ( child [ 5 ] - child [ 4 ] for child in children )
if non_matching_descendents > 0 :
self . formatted_print ( f " { indent_str } + { non_matching_descendents } child items that don ' t match the filter " )
item_types_obtained = 0
items_obtained_matching_filter = 0
for faction in SC2Race :
should_display , name , element , children , faction_items_obtained , faction_items_matching_filter = display_info ( faction )
item_types_obtained + = faction_items_obtained
items_obtained_matching_filter + = faction_items_matching_filter
display_tree ( should_display , name , element , children )
if filter_search == " " :
self . formatted_print ( f " [b]Obtained: { len ( self . ctx . items_received ) } items ( { item_types_obtained } types)[/b] " )
else :
self . formatted_print ( f " [b]Filter \" { filter_search } \" found { items_obtained_matching_filter } out of { item_types_obtained } obtained item types[/b] " )
return True
def _received_recent ( self , amount : str ) - > bool :
assert self . ctx . slot is not None
try :
display_amount = int ( amount )
except ValueError :
display_amount = 20
display_amount = min ( display_amount , len ( self . ctx . items_received ) )
self . formatted_print ( f " Last { display_amount } of { len ( self . ctx . items_received ) } items received (most recent last): " )
for item in self . ctx . items_received [ - display_amount : ] :
(
ColouredMessage ( )
. item ( item . item , self . ctx . slot , item . flags )
( " from " ) . location ( item . location , item . player )
( " by " ) . player ( item . player )
) . send ( self . ctx )
return True
def _cmd_option ( self , option_name : str = " " , option_value : str = " " ) - > None :
""" Sets a Starcraft game option that can be changed after generation. Use " /option list " to see all options. """
LOGIC_WARNING = " *Note changing this may result in logically unbeatable games* \n "
configurable_options = (
ConfigurableOptionInfo ( ' speed ' , ' game_speed ' , options . GameSpeed ) ,
ConfigurableOptionInfo ( ' kerrigan_presence ' , ' kerrigan_presence ' , options . KerriganPresence , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' kerrigan_level_cap ' , ' kerrigan_total_level_cap ' , options . KerriganTotalLevelCap , ConfigurableOptionType . INTEGER , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' kerrigan_mission_level_cap ' , ' kerrigan_levels_per_mission_completed_cap ' , options . KerriganLevelsPerMissionCompletedCap , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' kerrigan_levels_per_mission ' , ' kerrigan_levels_per_mission_completed ' , options . KerriganLevelsPerMissionCompleted , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' grant_story_levels ' , ' grant_story_levels ' , options . GrantStoryLevels , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' grant_story_tech ' , ' grant_story_tech ' , options . GrantStoryTech , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' control_ally ' , ' take_over_ai_allies ' , options . TakeOverAIAllies , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' soa_presence ' , ' spear_of_adun_presence ' , options . SpearOfAdunPresence , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' soa_in_nobuilds ' , ' spear_of_adun_present_in_no_build ' , options . SpearOfAdunPresentInNoBuild , can_break_logic = True ) ,
# Note(mm): Technically SOA passive presence is in the logic for Amon's Fall if Takeover AI Allies is true,
# but that's edge case enough I don't think we should warn about it.
ConfigurableOptionInfo ( ' soa_passive_presence ' , ' spear_of_adun_passive_ability_presence ' , options . SpearOfAdunPassiveAbilityPresence ) ,
ConfigurableOptionInfo ( ' soa_passives_in_nobuilds ' , ' spear_of_adun_passive_present_in_no_build ' , options . SpearOfAdunPassivesPresentInNoBuild ) ,
ConfigurableOptionInfo ( ' max_upgrade_level ' , ' max_upgrade_level ' , options . MaxUpgradeLevel , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' generic_upgrade_research ' , ' generic_upgrade_research ' , options . GenericUpgradeResearch ) ,
ConfigurableOptionInfo ( ' generic_upgrade_research_speedup ' , ' generic_upgrade_research_speedup ' , options . GenericUpgradeResearchSpeedup ) ,
ConfigurableOptionInfo ( ' minerals_per_item ' , ' minerals_per_item ' , options . MineralsPerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' gas_per_item ' , ' vespene_per_item ' , options . VespenePerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' supply_per_item ' , ' starting_supply_per_item ' , options . StartingSupplyPerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' max_supply_per_item ' , ' maximum_supply_per_item ' , options . MaximumSupplyPerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' reduced_supply_per_item ' , ' maximum_supply_reduction_per_item ' , options . MaximumSupplyReductionPerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' lowest_max_supply ' , ' lowest_maximum_supply ' , options . LowestMaximumSupply , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' research_cost_per_item ' , ' research_cost_reduction_per_item ' , options . ResearchCostReductionPerItem , ConfigurableOptionType . INTEGER ) ,
ConfigurableOptionInfo ( ' no_forced_camera ' , ' disable_forced_camera ' , options . DisableForcedCamera ) ,
ConfigurableOptionInfo ( ' skip_cutscenes ' , ' skip_cutscenes ' , options . SkipCutscenes ) ,
ConfigurableOptionInfo ( ' enable_morphling ' , ' enable_morphling ' , options . EnableMorphling , can_break_logic = True ) ,
ConfigurableOptionInfo ( ' difficulty_damage_modifier ' , ' difficulty_damage_modifier ' , options . DifficultyDamageModifier ) ,
ConfigurableOptionInfo ( ' void_trade_age_limit ' , ' trade_age_limit ' , options . VoidTradeAgeLimit ) ,
ConfigurableOptionInfo ( ' void_trade_workers ' , ' trade_workers_allowed ' , options . VoidTradeWorkers ) ,
ConfigurableOptionInfo ( ' mercenary_highlanders ' , ' mercenary_highlanders ' , options . MercenaryHighlanders ) ,
)
WARNING_COLOUR = " salmon "
CMD_COLOUR = " slateblue "
boolean_option_map = {
' y ' : ' true ' , ' yes ' : ' true ' , ' n ' : ' false ' , ' no ' : ' false ' , ' true ' : ' true ' , ' false ' : ' false ' ,
}
help_message = ColouredMessage ( inspect . cleandoc ( """
Options
- - - - - - - - - - - - - - - - - - - -
""" ))( ' \n ' )
for option in configurable_options :
option_help_text = inspect . cleandoc ( option . option_class . __doc__ or " No description provided. " ) . split ( ' \n ' , 1 ) [ 0 ]
help_message . coloured ( option . name , CMD_COLOUR ) ( " : " + " | " . join ( option . option_class . options )
+ f " -- { option_help_text } \n " )
if option . can_break_logic :
help_message . coloured ( LOGIC_WARNING , WARNING_COLOUR )
help_message ( " -------------------- \n Enter an option without arguments to see its current value. \n " )
if not option_name or option_name == ' list ' or option_name == ' help ' :
help_message . send ( self . ctx )
return
for option in configurable_options :
if option_name == option . name :
option_value = boolean_option_map . get ( option_value . lower ( ) , option_value )
if not option_value :
pass
elif option . option_type == ConfigurableOptionType . ENUM and option_value in option . option_class . options :
self . ctx . __dict__ [ option . variable_name ] = option . option_class . options [ option_value ]
elif option . option_type == ConfigurableOptionType . INTEGER :
try :
self . ctx . __dict__ [ option . variable_name ] = int ( option_value , base = 0 )
except :
self . output ( f " { option_value } is not a valid integer " )
else :
self . output ( f " Unknown option value ' { option_value } ' " )
ColouredMessage ( f " { option . name } is ' { option . option_class . get_option_name ( self . ctx . __dict__ [ option . variable_name ] ) } ' " ) . send ( self . ctx )
break
else :
self . output ( f " Unknown option ' { option_name } ' " )
help_message . send ( self . ctx )
def _cmd_color ( self , faction : str = " " , color : str = " " ) - > None :
""" Changes the player color for a given faction. """
player_colors = [
" White " , " Red " , " Blue " , " Teal " ,
" Purple " , " Yellow " , " Orange " , " Green " ,
" LightPink " , " Violet " , " LightGrey " , " DarkGreen " ,
" Brown " , " LightGreen " , " DarkGrey " , " Pink " ,
" Rainbow " , " Mengsk " , " BrightLime " , " Arcane " , " Ember " , " HotPink " ,
" Random " , " Default "
]
var_names = {
' raynor ' : ' player_color_raynor ' ,
' kerrigan ' : ' player_color_zerg ' ,
' primal ' : ' player_color_zerg_primal ' ,
' protoss ' : ' player_color_protoss ' ,
' nova ' : ' player_color_nova ' ,
}
faction = faction . lower ( )
if not faction :
for faction_name , key in var_names . items ( ) :
self . output ( f " Current player color for { faction_name } : { player_colors [ self . ctx . __dict__ [ key ] ] } " )
self . output ( " To change your color, add the faction name and color after the command. " )
self . output ( " Available factions: " + ' , ' . join ( var_names ) )
self . output ( " Available colors: " + ' , ' . join ( player_colors ) )
return
elif faction not in var_names :
self . output ( f " Unknown faction ' { faction } ' . " )
self . output ( " Available factions: " + ' , ' . join ( var_names ) )
return
match_colors = [ player_color . lower ( ) for player_color in player_colors ]
if not color :
self . output ( f " Current player color for { faction } : { player_colors [ self . ctx . __dict__ [ var_names [ faction ] ] ] } " )
self . output ( " To change this faction ' s colors, add the name of the color after the command. " )
self . output ( " Available colors: " + ' , ' . join ( player_colors ) )
else :
if color . lower ( ) not in match_colors :
self . output ( color + " is not a valid color. Available colors: " + ' , ' . join ( player_colors ) )
return
if color . lower ( ) == " random " :
color = random . choice ( player_colors [ : - 2 ] )
self . ctx . __dict__ [ var_names [ faction ] ] = match_colors . index ( color . lower ( ) )
self . ctx . pending_color_update = True
self . output ( f " Color for { faction } set to " + player_colors [ self . ctx . __dict__ [ var_names [ faction ] ] ] )
def _cmd_windowed_mode ( self , value = " " ) - > None :
""" Controls whether sc2 will launch in Windowed mode. Persists across sessions. """
if not value :
sc2_logger . info ( " Use `/windowed_mode [true|false]` to set the windowed mode " )
elif value . casefold ( ) in ( ' t ' , ' true ' , ' yes ' , ' y ' ) :
SC2World . settings . game_windowed_mode = True
force_settings_save_on_close ( )
else :
SC2World . settings . game_windowed_mode = False
force_settings_save_on_close ( )
sc2_logger . info ( f " Windowed mode is: { SC2World . settings . game_windowed_mode } " )
def _cmd_disable_mission_check ( self ) - > bool :
""" Disables the check to see if a mission is available to play. Meant for co-op runs where one player can play
the next mission in a chain the other player is doing . """
self . ctx . missions_unlocked = True
sc2_logger . info ( " Mission check has been disabled " )
return True
@mark_raw
def _cmd_set_path ( self , path : str = ' ' ) - > bool :
""" Manually set the SC2 install directory (if the automatic detection fails). """
if path :
os . environ [ " SC2PATH " ] = path
is_mod_installed_correctly ( )
return True
else :
sc2_logger . warning ( " When using set_path, you must type the path to your SC2 install directory. " )
return False
def _cmd_download_data ( self ) - > bool :
""" Download the most recent release of the necessary files for playing SC2 with
Archipelago . Will overwrite existing files . """
pool . submit ( self . _download_data , self . ctx )
return True
@staticmethod
def _download_data ( ctx : SC2Context ) - > bool :
if " SC2PATH " not in os . environ :
check_game_install_path ( )
if os . path . exists ( get_metadata_file ( ) ) :
with open ( get_metadata_file ( ) , " r " ) as f :
metadata = f . read ( )
else :
metadata = None
tempzip , metadata = download_latest_release_zip (
DATA_REPO_OWNER , DATA_REPO_NAME , DATA_API_VERSION , metadata = metadata , force_download = True )
if tempzip :
try :
zipfile . ZipFile ( tempzip ) . extractall ( path = os . environ [ " SC2PATH " ] )
sc2_logger . info ( " Download complete. Package installed. " )
if metadata is not None :
with open ( get_metadata_file ( ) , " w " ) as f :
f . write ( metadata )
finally :
os . remove ( tempzip )
else :
sc2_logger . warning ( " Download aborted/failed. Read the log for more information. " )
return False
ctx . data_out_of_date = False
return True
class SC2JSONtoTextParser ( JSONtoTextParser ) :
def __init__ ( self , ctx : SC2Context ) - > None :
self . handlers = {
" ItemSend " : self . _handle_color ,
" ItemCheat " : self . _handle_color ,
" Hint " : self . _handle_color ,
}
super ( ) . __init__ ( ctx )
def _handle_color ( self , node : JSONMessagePart ) - > str :
codes = node [ " color " ] . split ( " ; " )
buffer = " " . join ( self . color_code ( code ) for code in codes if code in self . color_codes )
return buffer + self . _handle_text ( node ) + ' </c> '
def _handle_item_name ( self , node : JSONMessagePart ) - > str :
if self . ctx . slot_info [ node [ " player " ] ] . game == STARCRAFT2 :
annotation = ITEM_NAME_ANNOTATIONS . get ( node [ " text " ] )
if annotation is not None :
node [ " text " ] + = f " { annotation } "
return super ( ) . _handle_item_name ( node )
def color_code ( self , code : str ) - > str :
return ' <c val= " ' + self . color_codes [ code ] + ' " > '
class SC2Context ( CommonContext ) :
command_processor = StarcraftClientProcessor
game = STARCRAFT2
items_handling = 0b111
def __init__ ( self , * args , * * kwargs ) - > None :
super ( SC2Context , self ) . __init__ ( * args , * * kwargs )
self . raw_text_parser = SC2JSONtoTextParser ( self )
self . data_out_of_date : bool = False
self . difficulty = - 1
self . game_speed = - 1
self . disable_forced_camera = 0
self . skip_cutscenes = 0
self . all_in_choice = 0
self . mission_order = 0
self . player_color_raynor = ColorChoice . option_blue
self . player_color_zerg = ColorChoice . option_orange
self . player_color_zerg_primal = ColorChoice . option_purple
self . player_color_protoss = ColorChoice . option_blue
self . player_color_nova = ColorChoice . option_dark_grey
self . pending_color_update = False
self . kerrigan_presence : int = KerriganPresence . default
self . kerrigan_primal_status = 0
self . enable_morphling = EnableMorphling . default
self . custom_mission_order : typing . List [ CampaignSlotData ] = [ ]
self . mission_id_to_entry_rules : typing . Dict [ int , MissionEntryRules ]
self . final_mission_ids : typing . List [ int ] = [ 29 ]
self . final_locations : typing . List [ int ] = [ ]
self . announcements : queue . Queue = queue . Queue ( )
self . sc2_run_task : typing . Optional [ asyncio . Task ] = None
self . missions_unlocked : bool = False # allow launching missions ignoring requirements
self . max_upgrade_level : int = MaxUpgradeLevel . default
self . generic_upgrade_missions = 0
self . generic_upgrade_research = 0
self . generic_upgrade_research_speedup : int = GenericUpgradeResearchSpeedup . default
self . generic_upgrade_items = 0
self . location_inclusions : typing . Dict [ LocationType , int ] = { }
self . location_inclusions_by_flag : typing . Dict [ LocationFlag , int ] = { }
self . plando_locations : typing . List [ str ] = [ ]
self . difficulty_override = - 1
self . game_speed_override = - 1
self . mission_id_to_location_ids : typing . Dict [ int , typing . List [ int ] ] = { }
self . last_bot : typing . Optional [ ArchipelagoBot ] = None
self . slot_data_version = 2
self . required_tactics : int = RequiredTactics . default
self . grant_story_tech : int = GrantStoryTech . default
self . grant_story_levels : int = GrantStoryLevels . default
self . take_over_ai_allies : int = TakeOverAIAllies . default
self . spear_of_adun_presence = SpearOfAdunPresence . option_not_present
self . spear_of_adun_present_in_no_build = SpearOfAdunPresentInNoBuild . option_false
self . spear_of_adun_passive_ability_presence = SpearOfAdunPassiveAbilityPresence . option_not_present
self . spear_of_adun_passive_present_in_no_build = SpearOfAdunPassivesPresentInNoBuild . option_false
self . minerals_per_item : int = 15 # For backwards compat with games generated pre-0.4.5
self . vespene_per_item : int = 15 # For backwards compat with games generated pre-0.4.5
self . starting_supply_per_item : int = 2 # For backwards compat with games generated pre-0.4.5
self . maximum_supply_per_item : int = 2
self . maximum_supply_reduction_per_item : int = options . MaximumSupplyReductionPerItem . default
self . lowest_maximum_supply : int = options . LowestMaximumSupply . default
self . research_cost_reduction_per_item : int = options . ResearchCostReductionPerItem . default
self . use_nova_wol_fallback : bool = False
self . use_nova_nco_fallback : bool = False
self . mercenary_highlanders : bool = False
self . kerrigan_levels_per_mission_completed = 0
self . trade_enabled : int = EnableVoidTrade . default
self . trade_age_limit : int = VoidTradeAgeLimit . default
self . trade_workers_allowed : int = VoidTradeWorkers . default
self . trade_underway : bool = False
self . trade_latest_reply : typing . Optional [ dict ] = None
self . trade_reply_event = asyncio . Event ( )
self . trade_lock_wait : int = 0
self . trade_lock_start : typing . Optional [ float ] = None
self . trade_response : typing . Optional [ str ] = None
self . difficulty_damage_modifier : int = DifficultyDamageModifier . default
self . mission_order_scouting = MissionOrderScouting . option_none
self . mission_item_classification : typing . Optional [ typing . Dict [ str , int ] ] = None
self . war_council_nerfs : bool = False
async def server_auth ( self , password_requested : bool = False ) - > None :
self . game = STARCRAFT2
if password_requested and not self . password :
await super ( SC2Context , self ) . server_auth ( password_requested )
await self . get_username ( )
await self . send_connect ( )
if self . ui :
self . ui . first_check = True
def is_legacy_game ( self ) :
return self . game == STARCRAFT2_WOL
def event_invalid_game ( self ) :
if self . is_legacy_game ( ) :
self . game = STARCRAFT2
super ( ) . event_invalid_game ( )
else :
self . game = STARCRAFT2_WOL
async_start ( self . send_connect ( ) )
def trade_storage_team ( self ) - > str :
return f " { TRADE_DATASTORAGE_TEAM } { self . team } "
def trade_storage_slot ( self ) - > str :
return f " { TRADE_DATASTORAGE_SLOT } { self . slot } "
def _apply_host_settings_to_options ( self ) - > None :
if str ( SC2World . settings . game_difficulty ) . casefold ( ) == ' casual ' :
self . difficulty = GameDifficulty . option_casual
elif str ( SC2World . settings . game_difficulty ) . casefold ( ) == ' normal ' :
self . difficulty = GameDifficulty . option_normal
elif str ( SC2World . settings . game_difficulty ) . casefold ( ) == ' hard ' :
self . difficulty = GameDifficulty . option_hard
elif str ( SC2World . settings . game_difficulty ) . casefold ( ) == ' brutal ' :
self . difficulty = GameDifficulty . option_brutal
if str ( SC2World . settings . game_speed ) . casefold ( ) == ' slower ' :
self . game_speed = GameSpeed . option_slower
elif str ( SC2World . settings . game_speed ) . casefold ( ) == ' slow ' :
self . game_speed = GameSpeed . option_slow
elif str ( SC2World . settings . game_speed ) . casefold ( ) == ' normal ' :
self . game_speed = GameSpeed . option_normal
elif str ( SC2World . settings . game_speed ) . casefold ( ) == ' fast ' :
self . game_speed = GameSpeed . option_fast
elif str ( SC2World . settings . game_speed ) . casefold ( ) == ' faster ' :
self . game_speed = GameSpeed . option_faster
if str ( SC2World . settings . disable_forced_camera ) . casefold ( ) == ' true ' :
self . disable_forced_camera = DisableForcedCamera . option_true
elif str ( SC2World . settings . disable_forced_camera ) . casefold ( ) == ' false ' :
self . disable_forced_camera = DisableForcedCamera . option_false
if str ( SC2World . settings . skip_cutscenes ) . casefold ( ) == ' true ' :
self . skip_cutscenes = SkipCutscenes . option_true
elif str ( SC2World . settings . skip_cutscenes ) . casefold ( ) == ' false ' :
self . skip_cutscenes = SkipCutscenes . option_false
def on_package ( self , cmd : str , args : dict ) - > None :
if cmd == " Connected " :
# Set up the trade storage
async_start ( self . send_msgs ( [
{ # We want to know about other clients' Set commands for locking
" cmd " : " SetNotify " ,
" keys " : [ self . trade_storage_team ( ) ] ,
} ,
{
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" default " : { TRADE_DATASTORAGE_LOCK : 0 } ,
" operations " : [ { " operation " : " default " , " value " : None } ] # value is ignored
}
] ) )
self . difficulty = args [ " slot_data " ] [ " game_difficulty " ]
self . game_speed = args [ " slot_data " ] . get ( " game_speed " , GameSpeed . option_default )
self . disable_forced_camera = args [ " slot_data " ] . get ( " disable_forced_camera " , DisableForcedCamera . default )
self . skip_cutscenes = args [ " slot_data " ] . get ( " skip_cutscenes " , SkipCutscenes . default )
self . all_in_choice = args [ " slot_data " ] [ " all_in_map " ]
self . slot_data_version = args [ " slot_data " ] . get ( " version " , 2 )
self . _apply_host_settings_to_options ( )
if self . slot_data_version < 4 :
# Maintaining backwards compatibility with older slot data
slot_req_table : dict = args [ " slot_data " ] [ " mission_req " ]
first_item = list ( slot_req_table . keys ( ) ) [ 0 ]
if first_item in [ str ( campaign . id ) for campaign in SC2Campaign ] :
# Multi-campaign
mission_req_table = { }
for campaign_id in slot_req_table :
campaign = lookup_id_to_campaign [ int ( campaign_id ) ]
mission_req_table [ campaign ] = {
mission : self . parse_mission_info ( mission_info )
for mission , mission_info in slot_req_table [ campaign_id ] . items ( )
}
else :
# Old format
mission_req_table = { SC2Campaign . GLOBAL : {
mission : self . parse_mission_info ( mission_info )
for mission , mission_info in slot_req_table . items ( )
}
}
self . custom_mission_order = self . parse_mission_req_table ( mission_req_table )
if self . slot_data_version > = 4 :
self . custom_mission_order = [
CampaignSlotData (
* * { field : value for field , value in campaign_data . items ( ) if field not in [ " layouts " , " entry_rule " ] } ,
entry_rule = SubRuleRuleData . parse_from_dict ( campaign_data [ " entry_rule " ] ) ,
layouts = [
LayoutSlotData (
* * { field : value for field , value in layout_data . items ( ) if field not in [ " missions " , " entry_rule " ] } ,
entry_rule = SubRuleRuleData . parse_from_dict ( layout_data [ " entry_rule " ] ) ,
missions = [
[
MissionSlotData (
* * { field : value for field , value in mission_data . items ( ) if field != " entry_rule " } ,
entry_rule = SubRuleRuleData . parse_from_dict ( mission_data [ " entry_rule " ] )
) for mission_data in column
] for column in layout_data [ " missions " ]
]
) for layout_data in campaign_data [ " layouts " ]
]
) for campaign_data in args [ " slot_data " ] [ " custom_mission_order " ]
]
self . mission_id_to_entry_rules = {
mission . mission_id : MissionEntryRules ( mission . entry_rule , layout . entry_rule , campaign . entry_rule )
for campaign in self . custom_mission_order for layout in campaign . layouts
for column in layout . missions for mission in column
}
self . mission_order = args [ " slot_data " ] . get ( " mission_order " , MissionOrder . option_vanilla )
if self . slot_data_version < 4 :
self . final_mission_ids = [ args [ " slot_data " ] . get ( " final_mission " , SC2Mission . ALL_IN . id ) ]
else :
self . final_mission_ids = args [ " slot_data " ] . get ( " final_mission_ids " , [ SC2Mission . ALL_IN . id ] )
self . final_locations = [ get_location_id ( mission_id , 0 ) for mission_id in self . final_mission_ids ]
self . player_color_raynor = _remap_color_option (
self . slot_data_version ,
args [ " slot_data " ] . get ( " player_color_terran_raynor " , ColorChoice . option_blue )
)
self . player_color_zerg = _remap_color_option (
self . slot_data_version ,
args [ " slot_data " ] . get ( " player_color_zerg " , ColorChoice . option_orange )
)
self . player_color_zerg_primal = _remap_color_option (
self . slot_data_version ,
args [ " slot_data " ] . get ( " player_color_zerg_primal " , ColorChoice . option_purple )
)
self . player_color_protoss = _remap_color_option (
self . slot_data_version ,
args [ " slot_data " ] . get ( " player_color_protoss " , ColorChoice . option_blue )
)
self . player_color_nova = _remap_color_option (
self . slot_data_version ,
args [ " slot_data " ] . get ( " player_color_nova " , ColorChoice . option_dark_grey )
)
self . war_council_nerfs = args [ " slot_data " ] . get ( " war_council_nerfs " , WarCouncilNerfs . option_false )
self . mercenary_highlanders = args [ " slot_data " ] . get ( " mercenary_highlanders " , MercenaryHighlanders . option_false )
self . generic_upgrade_missions = args [ " slot_data " ] . get ( " generic_upgrade_missions " , GenericUpgradeMissions . default )
self . max_upgrade_level = args [ " slot_data " ] . get ( " max_upgrade_level " , MaxUpgradeLevel . default )
self . generic_upgrade_items = args [ " slot_data " ] . get ( " generic_upgrade_items " , GenericUpgradeItems . option_individual_items )
self . generic_upgrade_research = args [ " slot_data " ] . get ( " generic_upgrade_research " , GenericUpgradeResearch . option_vanilla )
self . generic_upgrade_research_speedup = args [ " slot_data " ] . get ( " generic_upgrade_research_speedup " , GenericUpgradeResearchSpeedup . default )
self . kerrigan_presence = args [ " slot_data " ] . get ( " kerrigan_presence " , KerriganPresence . option_vanilla )
self . kerrigan_primal_status = args [ " slot_data " ] . get ( " kerrigan_primal_status " , KerriganPrimalStatus . option_vanilla )
self . kerrigan_levels_per_mission_completed = args [ " slot_data " ] . get ( " kerrigan_levels_per_mission_completed " , 0 )
self . kerrigan_levels_per_mission_completed_cap = args [ " slot_data " ] . get ( " kerrigan_levels_per_mission_completed_cap " , - 1 )
self . kerrigan_total_level_cap = args [ " slot_data " ] . get ( " kerrigan_total_level_cap " , - 1 )
self . enable_morphling = args [ " slot_data " ] . get ( " enable_morphling " , EnableMorphling . option_false )
self . grant_story_tech = args [ " slot_data " ] . get ( " grant_story_tech " , GrantStoryTech . option_no_grant )
self . grant_story_levels = args [ " slot_data " ] . get ( " grant_story_levels " , GrantStoryLevels . option_additive )
self . required_tactics = args [ " slot_data " ] . get ( " required_tactics " , RequiredTactics . option_standard )
self . take_over_ai_allies = args [ " slot_data " ] . get ( " take_over_ai_allies " , TakeOverAIAllies . option_false )
self . spear_of_adun_presence = args [ " slot_data " ] . get ( " spear_of_adun_presence " , SpearOfAdunPresence . option_not_present )
self . spear_of_adun_present_in_no_build = args [ " slot_data " ] . get ( " spear_of_adun_present_in_no_build " , SpearOfAdunPresentInNoBuild . option_false )
if self . slot_data_version < 4 :
self . spear_of_adun_passive_ability_presence = args [ " slot_data " ] . get ( " spear_of_adun_autonomously_cast_ability_presence " , SpearOfAdunPassiveAbilityPresence . option_not_present )
self . spear_of_adun_passive_present_in_no_build = args [ " slot_data " ] . get ( " spear_of_adun_autonomously_cast_present_in_no_build " , SpearOfAdunPassivesPresentInNoBuild . option_false )
else :
self . spear_of_adun_passive_ability_presence = args [ " slot_data " ] . get ( " spear_of_adun_passive_ability_presence " , SpearOfAdunPassiveAbilityPresence . option_not_present )
self . spear_of_adun_passive_present_in_no_build = args [ " slot_data " ] . get ( " spear_of_adun_passive_present_in_no_build " , SpearOfAdunPassivesPresentInNoBuild . option_false )
self . minerals_per_item = args [ " slot_data " ] . get ( " minerals_per_item " , 15 )
self . vespene_per_item = args [ " slot_data " ] . get ( " vespene_per_item " , 15 )
self . starting_supply_per_item = args [ " slot_data " ] . get ( " starting_supply_per_item " , 2 )
self . maximum_supply_per_item = args [ " slot_data " ] . get ( " maximum_supply_per_item " , options . MaximumSupplyPerItem . default )
self . maximum_supply_reduction_per_item = args [ " slot_data " ] . get ( " maximum_supply_reduction_per_item " , options . MaximumSupplyReductionPerItem . default )
self . lowest_maximum_supply = args [ " slot_data " ] . get ( " lowest_maximum_supply " , options . LowestMaximumSupply . default )
self . research_cost_reduction_per_item = args [ " slot_data " ] . get ( " research_cost_reduction_per_item " , options . ResearchCostReductionPerItem . default )
self . use_nova_wol_fallback = args [ " slot_data " ] . get ( " use_nova_wol_fallback " , True )
if self . slot_data_version < 4 :
self . use_nova_nco_fallback = args [ " slot_data " ] . get ( " nova_covert_ops_only " , False ) and self . mission_order == MissionOrder . option_vanilla
else :
self . use_nova_nco_fallback = args [ " slot_data " ] . get ( " use_nova_nco_fallback " , False )
self . trade_enabled = args [ " slot_data " ] . get ( " enable_void_trade " , EnableVoidTrade . option_false )
self . trade_age_limit = args [ " slot_data " ] . get ( " void_trade_age_limit " , VoidTradeAgeLimit . default )
self . trade_workers_allowed = args [ " slot_data " ] . get ( " void_trade_workers " , VoidTradeWorkers . default )
self . difficulty_damage_modifier = args [ " slot_data " ] . get ( " difficulty_damage_modifier " , DifficultyDamageModifier . option_true )
self . mission_order_scouting = args [ " slot_data " ] . get ( " mission_order_scouting " , MissionOrderScouting . option_none )
self . mission_item_classification = args [ " slot_data " ] . get ( " mission_item_classification " )
if self . required_tactics == RequiredTactics . option_no_logic :
# Locking Grant Story Tech/Levels if no logic
self . grant_story_tech = GrantStoryTech . option_grant
self . grant_story_levels = GrantStoryLevels . option_minimum
self . location_inclusions = {
LocationType . VICTORY : LocationInclusion . option_enabled , # Victory checks are always enabled
LocationType . VICTORY_CACHE : LocationInclusion . option_enabled , # Victory checks are always enabled
LocationType . VANILLA : args [ " slot_data " ] . get ( " vanilla_locations " , VanillaLocations . default ) ,
LocationType . EXTRA : args [ " slot_data " ] . get ( " extra_locations " , ExtraLocations . default ) ,
LocationType . CHALLENGE : args [ " slot_data " ] . get ( " challenge_locations " , ChallengeLocations . default ) ,
LocationType . MASTERY : args [ " slot_data " ] . get ( " mastery_locations " , MasteryLocations . default ) ,
}
self . location_inclusions_by_flag = {
LocationFlag . SPEEDRUN : args [ " slot_data " ] . get ( " speedrun_locations " , SpeedrunLocations . default ) ,
LocationFlag . PREVENTATIVE : args [ " slot_data " ] . get ( " preventative_locations " , PreventativeLocations . default ) ,
}
self . plando_locations = args [ " slot_data " ] . get ( " plando_locations " , [ ] )
self . build_location_to_mission_mapping ( )
# Looks for the required maps and mods for SC2. Runs check_game_install_path.
maps_present = is_mod_installed_correctly ( )
if os . path . exists ( get_metadata_file ( ) ) :
with open ( get_metadata_file ( ) , " r " ) as f :
current_ver = f . read ( )
sc2_logger . debug ( f " Current version: { current_ver } " )
if is_mod_update_available ( DATA_REPO_OWNER , DATA_REPO_NAME , DATA_API_VERSION , current_ver ) :
(
ColouredMessage ( ) . coloured ( " NOTICE: Update for required files found. " , colour = " red " )
( " Run " ) . coloured ( " /download_data " , colour = " slateblue " )
( " to install. " )
) . send ( self )
self . data_out_of_date = True
elif maps_present :
(
ColouredMessage ( )
. coloured ( " NOTICE: Your map files may be outdated (version number not found). " , colour = " red " )
( " Run " ) . coloured ( " /download_data " , colour = " slateblue " )
( " to install. " )
) . send ( self )
self . data_out_of_date = True
ColouredMessage ( " [b]Check the Launcher tab to start playing.[/b] " , keep_markup = True ) . send ( self )
elif cmd == " SetReply " :
# Currently can only be Void Trade reply
self . trade_latest_reply = args
self . trade_reply_event . set ( )
@staticmethod
def parse_mission_info ( mission_info : dict [ str , typing . Any ] ) - > MissionInfo :
if mission_info . get ( " id " ) is not None :
mission_info [ " mission " ] = lookup_id_to_mission [ mission_info [ " id " ] ]
elif isinstance ( mission_info [ " mission " ] , int ) :
mission_info [ " mission " ] = lookup_id_to_mission [ mission_info [ " mission " ] ]
return MissionInfo (
* * { field : value for field , value in mission_info . items ( ) if field in MissionInfo . _fields }
)
@staticmethod
def parse_mission_req_table ( mission_req_table : typing . Dict [ SC2Campaign , typing . Dict [ typing . Any , MissionInfo ] ] ) - > typing . List [ CampaignSlotData ] :
campaigns : typing . List [ typing . Tuple [ int , CampaignSlotData ] ] = [ ]
rolling_rule_id = 0
for ( campaign , campaign_data ) in mission_req_table . items ( ) :
if campaign . campaign_name == " Global " :
campaign_name = " "
else :
campaign_name = campaign . campaign_name
categories : typing . Dict [ str , typing . List [ MissionSlotData ] ] = { }
for mission in campaign_data . values ( ) :
if mission . category not in categories :
categories [ mission . category ] = [ ]
mission_id = mission . mission . id
sub_rules : typing . List [ CountMissionsRuleData ] = [ ]
missions : typing . List [ int ]
if mission . number :
amount = mission . number
missions = [
mission . mission . id
for mission in mission_req_table [ campaign ] . values ( )
]
sub_rules . append ( CountMissionsRuleData ( missions , amount , [ campaign_name ] ) )
prev_missions : typing . List [ int ] = [ ]
if len ( mission . required_world ) > 0 :
missions = [ ]
for connection in mission . required_world :
if isinstance ( connection , dict ) :
required_campaign = { }
for camp , camp_data in mission_req_table . items ( ) :
if camp . id == connection [ " campaign " ] :
required_campaign = camp_data
break
required_mission_id = connection [ " connect_to " ]
else :
required_campaign = mission_req_table [ connection . campaign ]
required_mission_id = connection . connect_to
required_mission = list ( required_campaign . values ( ) ) [ required_mission_id - 1 ]
missions . append ( required_mission . mission . id )
if required_mission . category == mission . category :
prev_missions . append ( required_mission . mission . id )
if mission . or_requirements :
amount = 1
else :
amount = len ( missions )
sub_rules . append ( CountMissionsRuleData ( missions , amount , missions ) )
entry_rule = SubRuleRuleData ( rolling_rule_id , sub_rules , len ( sub_rules ) )
rolling_rule_id + = 1
categories [ mission . category ] . append ( MissionSlotData . legacy ( mission_id , prev_missions , entry_rule ) )
layouts : typing . List [ LayoutSlotData ] = [ ]
for ( layout , mission_slots ) in categories . items ( ) :
if layout . startswith ( " _ " ) :
layout_name = " "
else :
layout_name = layout
layouts . append ( LayoutSlotData . legacy ( layout_name , [ mission_slots ] ) )
campaigns . append ( ( campaign . id , CampaignSlotData . legacy ( campaign_name , layouts ) ) )
return [ data for ( _ , data ) in sorted ( campaigns ) ]
def on_print_json ( self , args : dict ) - > None :
# goes to this world
if " receiving " in args and self . slot_concerns_self ( args [ " receiving " ] ) :
relevant = True
# found in this world
elif " item " in args and self . slot_concerns_self ( args [ " item " ] . player ) :
relevant = True
# not related
else :
relevant = False
if relevant :
self . announcements . put ( self . raw_text_parser ( copy . deepcopy ( args [ " data " ] ) ) )
super ( SC2Context , self ) . on_print_json ( args )
def run_gui ( self ) - > None :
from . client_gui import start_gui
start_gui ( self )
async def shutdown ( self ) - > None :
await super ( SC2Context , self ) . shutdown ( )
if self . last_bot :
self . last_bot . want_close = True
# If the client is not set up yet, the game is not done loading and must be force-closed
if not hasattr ( self . last_bot , " client " ) :
bot . sc2process . kill_switch . kill_all ( )
if self . sc2_run_task :
self . sc2_run_task . cancel ( )
async def disconnect ( self , allow_autoreconnect : bool = False ) :
self . finished_game = False
await super ( SC2Context , self ) . disconnect ( allow_autoreconnect = allow_autoreconnect )
def play_mission ( self , mission_id : int ) - > bool :
if self . missions_unlocked or is_mission_available ( self , mission_id ) :
if self . sc2_run_task :
if not self . sc2_run_task . done ( ) :
sc2_logger . warning ( " Starcraft 2 Client is still running! " )
self . sc2_run_task . cancel ( ) # doesn't actually close the game, just stops the python task
if self . slot is None :
sc2_logger . warning ( " Launching Mission without Archipelago authentication, "
" checks will not be registered to server. " )
self . sc2_run_task = asyncio . create_task ( starcraft_launch ( self , mission_id ) ,
name = " Starcraft 2 Launch " )
return True
else :
sc2_logger . info ( f " { lookup_id_to_mission [ mission_id ] . mission_name } is not currently unlocked. " )
return False
def build_location_to_mission_mapping ( self ) - > None :
mission_id_to_location_ids : typing . Dict [ int , typing . Set [ int ] ] = {
mission . mission_id : set ( )
for campaign in self . custom_mission_order for layout in campaign . layouts
for column in layout . missions for mission in column
}
for loc in self . server_locations :
offset = (
SC2WOL_LOC_ID_OFFSET
if loc < SC2HOTS_LOC_ID_OFFSET
else ( SC2HOTS_LOC_ID_OFFSET - SC2Mission . ALL_IN . id * VICTORY_MODULO )
)
mission_id , objective = divmod ( loc - offset , VICTORY_MODULO )
mission_id_to_location_ids [ mission_id ] . add ( objective )
self . mission_id_to_location_ids = {
mission_id : sorted ( objectives )
for mission_id , objectives in mission_id_to_location_ids . items ( )
}
def locations_for_mission ( self , mission : SC2Mission ) - > typing . Iterable [ int ] :
mission_id : int = mission . id
objectives = self . mission_id_to_location_ids [ mission_id ]
for objective in objectives :
yield get_location_id ( mission_id , objective )
def locations_for_mission_id ( self , mission_id : int ) - > typing . Iterable [ int ] :
objectives = self . mission_id_to_location_ids [ mission_id ]
for objective in objectives :
yield get_location_id ( mission_id , objective )
def uncollected_locations_in_mission ( self , mission : SC2Mission ) - > typing . Iterable [ int ] :
for location_id in self . locations_for_mission ( mission ) :
if location_id in self . missing_locations :
yield location_id
def is_mission_completed ( self , mission_id : int ) - > bool :
return get_location_id ( mission_id , 0 ) in self . checked_locations
async def trade_acquire_storage ( self , keep_trying : bool = False ) - > typing . Optional [ dict ] :
# This function was largely taken from the Pokemon Emerald client
"""
Acquires a lock on the Void Trade DataStorage .
Locking the key means you have exclusive access
to modifying the value until you unlock it or the key expires ( 5 seconds ) .
If ` keep_trying ` is ` True ` , it will keep trying to acquire the lock
until successful . Otherwise it will return ` None ` if it fails to
acquire the lock .
"""
while not self . exit_event . is_set ( ) and self . last_bot and self . last_bot . game_running :
lock = int ( time . time_ns ( ) / 1000000000 ) # in seconds
# Make sure we're not past the waiting limit
# SC2 needs to be notified within 10 minutes of game time (training time of the dummy units)
if self . trade_lock_start is not None :
if self . last_bot . time - self . trade_lock_start > = TRADE_LOCK_WAIT_LIMIT :
self . trade_lock_wait = 0
self . trade_lock_start = None
return None
elif keep_trying :
self . trade_lock_start = self . last_bot . time
message_uuid = str ( uuid . uuid4 ( ) )
await self . send_msgs ( [ {
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" default " : { TRADE_DATASTORAGE_LOCK : 0 } ,
" want_reply " : True ,
" operations " : [ { " operation " : " update " , " value " : { TRADE_DATASTORAGE_LOCK : lock } } ] ,
" uuid " : message_uuid ,
} ] )
self . trade_reply_event . clear ( )
try :
await asyncio . wait_for ( self . trade_reply_event . wait ( ) , 5 )
except asyncio . TimeoutError :
if not keep_trying :
return None
continue
assert self . trade_latest_reply is not None
reply = copy . deepcopy ( self . trade_latest_reply )
# Make sure the most recently received update was triggered by our lock attempt
if reply . get ( " uuid " , None ) != message_uuid :
if not keep_trying :
return None
await asyncio . sleep ( TRADE_LOCK_TIME )
continue
# Make sure the current value of the lock is what we set it to
# (I think this should theoretically never run)
if reply [ " value " ] [ TRADE_DATASTORAGE_LOCK ] != lock :
if not keep_trying :
return None
await asyncio . sleep ( TRADE_LOCK_TIME )
continue
# Make sure that the lock value we replaced is at least 5 seconds old
# If it was unlocked before our change, its value was 0 and it will look decades old
if lock - reply [ " original_value " ] [ TRADE_DATASTORAGE_LOCK ] < TRADE_LOCK_TIME :
if not keep_trying :
return None
# Multiple clients trying to lock the key may get stuck in a loop of checking the lock
# by trying to set it, which will extend its expiration. So if we see that the lock was
# too new when we replaced it, we should wait for increasingly longer periods so that
# eventually the lock will expire and a client will acquire it.
self . trade_lock_wait + = TRADE_LOCK_TIME
self . trade_lock_wait + = random . randrange ( 100 , 500 ) / 1000
await asyncio . sleep ( self . trade_lock_wait )
continue
# We have the lock, reset the waiting period and return
self . trade_lock_wait = 0
self . trade_lock_start = None
return reply
return None
async def trade_receive ( self , amount : int = 1 ) :
"""
Tries to pop ` amount ` units out of the trade storage .
"""
reply = await self . trade_acquire_storage ( True )
if reply is None :
self . trade_response = " ?TradeFail Void Trade failed: Could not communicate with server. Trade cost refunded. "
return None
# Find available units
# Ignore units we sent ourselves
allowed_slots : typing . List [ str ] = [
slot for slot in reply [ " value " ]
if slot != TRADE_DATASTORAGE_LOCK \
and slot != self . trade_storage_slot ( )
]
# Filter out trades that are too old
if self . trade_age_limit != VoidTradeAgeLimit . option_disabled :
trade_time = reply [ " value " ] [ TRADE_DATASTORAGE_LOCK ]
allowed_age = void_trade_age_limits_ms [ self . trade_age_limit ]
is_young_enough = lambda send_time : trade_time - send_time < = allowed_age
else :
is_young_enough = lambda _ : True
# Filter out banned units
if self . trade_workers_allowed == VoidTradeWorkers . option_false :
is_unit_allowed = lambda unit : unit not in worker_units
else :
is_unit_allowed = lambda _ : True
available_units : typing . List [ typing . Tuple [ str , str , int ] ] = [ ]
available_counts : typing . List [ int ] = [ ]
for slot in allowed_slots :
for ( send_time , units ) in reply [ " value " ] [ slot ] . items ( ) :
if is_young_enough ( int ( send_time ) ) :
for ( unit , count ) in units . items ( ) :
if is_unit_allowed ( str ( unit ) ) :
available_units . append ( ( unit , slot , send_time ) )
available_counts . append ( count )
# Pick units to receive
# If there's not enough units in total, just pick as many as possible
# SC2 should handle the refund
available = sum ( available_counts )
refunds = 0
if available < amount :
refunds = amount - available
amount = available
if available == 0 :
# random.sample crashes if counts is an empty list
units = [ ]
else :
units = random . sample ( available_units , amount , counts = available_counts )
# Build response data
unit_counts : typing . Dict [ str , int ] = { }
slots_to_update : typing . Dict [ str , typing . Dict [ int , typing . Dict [ str , int ] ] ] = { }
for ( unit , slot , send_time ) in units :
unit_counts [ unit ] = unit_counts . get ( unit , 0 ) + 1
if slot not in slots_to_update :
slots_to_update [ slot ] = copy . deepcopy ( reply [ " value " ] [ slot ] )
slots_to_update [ slot ] [ send_time ] [ unit ] - = 1
# Clean up units that were removed completely
if slots_to_update [ slot ] [ send_time ] [ unit ] == 0 :
slots_to_update [ slot ] [ send_time ] . pop ( unit )
# Clean up trades that were completely exhausted
if len ( slots_to_update [ slot ] [ send_time ] ) == 0 :
slots_to_update [ slot ] . pop ( send_time )
await self . send_msgs ( [
{ # Update server storage
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" operations " : [ { " operation " : " update " , " value " : slots_to_update } ]
} ,
{ # Release the lock
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" operations " : [ { " operation " : " update " , " value " : { TRADE_DATASTORAGE_LOCK : 0 } } ]
}
] )
# Give units to bot
self . trade_response = f " ?Trade { refunds } " + " " . join ( f " { unit } { count } " for ( unit , count ) in unit_counts . items ( ) )
async def trade_send ( self , units : typing . List [ str ] ) :
"""
Tries to upload ` units ` to the trade DataStorage .
"""
reply = await self . trade_acquire_storage ( True )
if reply is None :
self . trade_response = " ?TradeFail Void Trade failed: Could not communicate with server. Your units remain. "
return None
# Create a storage entry for the time the trade was confirmed
trade_time = reply [ " value " ] [ TRADE_DATASTORAGE_LOCK ]
storage_entry = { }
for unit in units :
storage_entry [ unit ] = storage_entry . get ( unit , 0 ) + 1
# Update the storage with the new units
data : typing . Dict [ int , typing . Dict [ str , int ] ] = copy . deepcopy ( reply [ " value " ] . get ( self . trade_storage_slot ( ) , { } ) )
data [ trade_time ] = storage_entry
await self . send_msgs ( [
{ # Send the updated data
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" operations " : [ { " operation " : " update " , " value " : { self . trade_storage_slot ( ) : data } } ]
} ,
{ # Release the lock
" cmd " : " Set " ,
" key " : self . trade_storage_team ( ) ,
" operations " : [ { " operation " : " update " , " value " : { TRADE_DATASTORAGE_LOCK : 0 } } ]
}
] )
# Notify the game
self . trade_response = " ?TradeSuccess Void Trade successful: Units sent! "
class CompatItemHolder ( typing . NamedTuple ) :
name : str
quantity : int = 1
2025-09-30 09:34:26 -07:00
async def main ( args : typing . Sequence [ str ] | None ) :
2025-09-02 17:40:58 +02:00
multiprocessing . freeze_support ( )
parser = get_base_parser ( )
parser . add_argument ( ' --name ' , default = None , help = " Slot Name to connect as. " )
2025-09-30 09:34:26 -07:00
args , uri = parser . parse_known_args ( args )
2025-09-02 17:40:58 +02:00
if uri and uri [ 0 ] . startswith ( ' archipelago:// ' ) :
2025-09-30 09:34:26 -07:00
args . url = uri [ 0 ]
handle_url_arg ( args , parser )
2025-09-02 17:40:58 +02:00
ctx = SC2Context ( args . connect , args . password )
ctx . auth = args . name
if ctx . server_task is None :
ctx . server_task = asyncio . create_task ( server_loop ( ctx ) , name = " ServerLoop " )
if gui_enabled :
ctx . run_gui ( )
ctx . run_cli ( )
await ctx . exit_event . wait ( )
await ctx . shutdown ( )
# These items must be given to the player if the game is generated on older versions
API2_TO_API3_COMPAT_ITEMS : typing . Set [ CompatItemHolder ] = {
CompatItemHolder ( item_names . PHOTON_CANNON ) ,
CompatItemHolder ( item_names . OBSERVER ) ,
CompatItemHolder ( item_names . WARP_HARMONIZATION ) ,
CompatItemHolder ( item_names . PROGRESSIVE_PROTOSS_WEAPON_ARMOR_UPGRADE , 3 )
}
API3_TO_API4_COMPAT_ITEMS : typing . Set [ CompatItemHolder ] = {
# War Council
CompatItemHolder ( item_names . ZEALOT_WHIRLWIND ) ,
CompatItemHolder ( item_names . CENTURION_RESOURCE_EFFICIENCY ) ,
CompatItemHolder ( item_names . SENTINEL_RESOURCE_EFFICIENCY ) ,
CompatItemHolder ( item_names . STALKER_PHASE_REACTOR ) ,
CompatItemHolder ( item_names . DRAGOON_PHALANX_SUIT ) ,
CompatItemHolder ( item_names . INSTIGATOR_MODERNIZED_SERVOS ) ,
CompatItemHolder ( item_names . ADEPT_DISRUPTIVE_TRANSFER ) ,
CompatItemHolder ( item_names . SLAYER_PHASE_BLINK ) ,
CompatItemHolder ( item_names . AVENGER_KRYHAS_CLOAK ) ,
CompatItemHolder ( item_names . DARK_TEMPLAR_LESSER_SHADOW_FURY ) ,
CompatItemHolder ( item_names . DARK_TEMPLAR_GREATER_SHADOW_FURY ) ,
CompatItemHolder ( item_names . BLOOD_HUNTER_BRUTAL_EFFICIENCY ) ,
CompatItemHolder ( item_names . SENTRY_DOUBLE_SHIELD_RECHARGE ) ,
CompatItemHolder ( item_names . ENERGIZER_MOBILE_CHRONO_BEAM ) ,
CompatItemHolder ( item_names . HAVOC_ENDURING_SIGHT ) ,
CompatItemHolder ( item_names . HIGH_TEMPLAR_PLASMA_SURGE ) ,
CompatItemHolder ( item_names . SIGNIFIER_FEEDBACK ) ,
CompatItemHolder ( item_names . ASCENDANT_BREATH_OF_CREATION ) ,
CompatItemHolder ( item_names . DARK_ARCHON_INDOMITABLE_WILL ) ,
CompatItemHolder ( item_names . IMMORTAL_IMPROVED_BARRIER ) ,
CompatItemHolder ( item_names . VANGUARD_RAPIDFIRE_CANNON ) ,
CompatItemHolder ( item_names . VANGUARD_FUSION_MORTARS ) ,
CompatItemHolder ( item_names . ANNIHILATOR_TWILIGHT_CHASSIS ) ,
CompatItemHolder ( item_names . COLOSSUS_FIRE_LANCE ) ,
CompatItemHolder ( item_names . WRATHWALKER_AERIAL_TRACKING ) ,
CompatItemHolder ( item_names . REAVER_KHALAI_REPLICATORS ) ,
CompatItemHolder ( item_names . PHOENIX_DOUBLE_GRAVITON_BEAM ) ,
CompatItemHolder ( item_names . CORSAIR_NETWORK_DISRUPTION ) ,
CompatItemHolder ( item_names . MIRAGE_GRAVITON_BEAM ) ,
CompatItemHolder ( item_names . VOID_RAY_PRISMATIC_RANGE ) ,
CompatItemHolder ( item_names . CARRIER_REPAIR_DRONES ) ,
CompatItemHolder ( item_names . TEMPEST_DISINTEGRATION ) ,
CompatItemHolder ( item_names . ARBITER_VESSEL_OF_THE_CONCLAVE ) ,
CompatItemHolder ( item_names . MOTHERSHIP_INTEGRATED_POWER ) ,
# Other items
CompatItemHolder ( item_names . ASCENDANT_ARCHON_MERGE ) ,
CompatItemHolder ( item_names . DARK_TEMPLAR_ARCHON_MERGE ) ,
CompatItemHolder ( item_names . SPORE_CRAWLER_BIO_BONUS ) ,
}
def compat_item_to_network_items ( compat_item : CompatItemHolder ) - > typing . List [ NetworkItem ] :
item_id = get_full_item_list ( ) [ compat_item . name ] . code
network_item = NetworkItem ( item_id , 0 , 0 , 0 )
return compat_item . quantity * [ network_item ]
def calculate_items ( ctx : SC2Context ) - > typing . Dict [ SC2Race , typing . List [ int ] ] :
items = ctx . items_received . copy ( )
item_list = get_full_item_list ( )
def create_network_item ( item_name : str ) - > NetworkItem :
return NetworkItem ( item_list [ item_name ] . code , 0 , 0 , 0 )
# Items unlocked in earlier generator versions by default (Prophecy defaults, war council, rebalances)
if ctx . slot_data_version < 3 :
for compat_item in API2_TO_API3_COMPAT_ITEMS :
items . extend ( compat_item_to_network_items ( compat_item ) )
if ctx . slot_data_version < 4 :
for compat_item in API3_TO_API4_COMPAT_ITEMS :
items . extend ( compat_item_to_network_items ( compat_item ) )
received_item_ids = set ( item . item for item in ctx . items_received )
if item_list [ item_names . GHOST_RESOURCE_EFFICIENCY ] . code in received_item_ids :
items . append ( create_network_item ( item_names . GHOST_BARGAIN_BIN_PRICES ) )
if item_list [ item_names . SPECTRE_RESOURCE_EFFICIENCY ] . code in received_item_ids :
items . append ( create_network_item ( item_names . SPECTRE_BARGAIN_BIN_PRICES ) )
if item_list [ item_names . ROGUE_FORCES ] . code in received_item_ids :
items . append ( create_network_item ( item_names . UNRESTRICTED_MUTATION ) )
if item_list [ item_names . SCOUT_RESOURCE_EFFICIENCY ] . code in received_item_ids :
items . append ( create_network_item ( item_names . SCOUT_SUPPLY_EFFICIENCY ) )
if item_list [ item_names . REAVER_RESOURCE_EFFICIENCY ] . code in received_item_ids :
items . append ( create_network_item ( item_names . REAVER_BARGAIN_BIN_PRICES ) )
# API < 4 Orbital Command Count (Deprecated item)
orbital_command_count : int = 0
network_item : NetworkItem
accumulators : typing . Dict [ SC2Race , typing . List [ int ] ] = {
race : [ 0 for element in item_type_enum_class if element . flag_word > = 0 ]
for race , item_type_enum_class in race_to_item_type . items ( )
}
# Protoss Shield grouped item specific logic
shields_from_ground_upgrade : int = 0
shields_from_air_upgrade : int = 0
for network_item in items :
name = lookup_id_to_name . get ( network_item . item )
if name is None :
continue
item_data : ItemData = item_list [ name ]
if item_data . type . flag_word < 0 :
continue
# exists exactly once
if item_data . quantity == 1 or name in item_name_groups [ ItemGroupNames . UNRELEASED_ITEMS ] :
accumulators [ item_data . race ] [ item_data . type . flag_word ] | = 1 << item_data . number
# exists multiple times
elif item_data . quantity > 1 :
flaggroup = item_data . type . flag_word
# Generic upgrades apply only to Weapon / Armor upgrades
if item_data . number > = 0 :
accumulators [ item_data . race ] [ flaggroup ] + = 1 << item_data . number
else :
if name == item_names . PROGRESSIVE_PROTOSS_GROUND_UPGRADE :
shields_from_ground_upgrade + = 1
if name == item_names . PROGRESSIVE_PROTOSS_AIR_UPGRADE :
shields_from_air_upgrade + = 1
for bundled_number in get_bundle_upgrade_member_numbers ( name ) :
accumulators [ item_data . race ] [ flaggroup ] + = 1 << bundled_number
# Regen bio-steel nerf with API3 - undo for older games
if ctx . slot_data_version < 3 and name == item_names . PROGRESSIVE_REGENERATIVE_BIO_STEEL :
current_level = ( accumulators [ item_data . race ] [ flaggroup ] >> item_data . number ) % 4
if current_level == 2 :
# Switch from level 2 to level 3 for compatibility
accumulators [ item_data . race ] [ flaggroup ] + = 1 << item_data . number
# sum
# Fillers, deprecated items
else :
if name == item_names . PROGRESSIVE_ORBITAL_COMMAND :
orbital_command_count + = 1
elif item_data . type == ZergItemType . Level :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = item_data . number
elif name == item_names . STARTING_MINERALS :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = ctx . minerals_per_item
elif name == item_names . STARTING_VESPENE :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = ctx . vespene_per_item
elif name == item_names . STARTING_SUPPLY :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = ctx . starting_supply_per_item
elif name == item_names . UPGRADE_RESEARCH_COST :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = ctx . research_cost_reduction_per_item
else :
accumulators [ item_data . race ] [ item_data . type . flag_word ] + = 1
# Fix Shields from generic upgrades by unit class (Maximum of ground/air upgrades)
if shields_from_ground_upgrade > 0 or shields_from_air_upgrade > 0 :
shield_upgrade_level = max ( shields_from_ground_upgrade , shields_from_air_upgrade )
shield_upgrade_item = item_list [ item_names . PROGRESSIVE_PROTOSS_SHIELDS ]
for _ in range ( 0 , shield_upgrade_level ) :
accumulators [ shield_upgrade_item . race ] [ shield_upgrade_item . type . flag_word ] + = 1 << shield_upgrade_item . number
# Deprecated Orbital Command handling (Backwards compatibility):
if orbital_command_count > 0 :
orbital_command_replacement_items : typing . List [ str ] = [
item_names . COMMAND_CENTER_SCANNER_SWEEP ,
item_names . COMMAND_CENTER_MULE ,
item_names . COMMAND_CENTER_EXTRA_SUPPLIES ,
item_names . PLANETARY_FORTRESS_ORBITAL_MODULE
]
replacement_item_ids = [ get_full_item_list ( ) [ item_name ] . code for item_name in orbital_command_replacement_items ]
if sum ( item_id in replacement_item_ids for item_id in items ) > 0 :
logger . warning ( inspect . cleandoc ( """
Both old Orbital Command and its replacements are present in the world . Skipping compatibility handling .
""" ))
else :
# None of replacement items are present
# L1: MULE and Scanner Sweep
scanner_sweep_data = get_full_item_list ( ) [ item_names . COMMAND_CENTER_SCANNER_SWEEP ]
mule_data = get_full_item_list ( ) [ item_names . COMMAND_CENTER_MULE ]
accumulators [ scanner_sweep_data . race ] [ scanner_sweep_data . type . flag_word ] + = 1 << scanner_sweep_data . number
accumulators [ mule_data . race ] [ mule_data . type . flag_word ] + = 1 << mule_data . number
if orbital_command_count > = 2 :
# L2 MULE and Scanner Sweep usable even in Planetary Fortress Mode
planetary_orbital_module_data = get_full_item_list ( ) [ item_names . PLANETARY_FORTRESS_ORBITAL_MODULE ]
accumulators [ planetary_orbital_module_data . race ] [ planetary_orbital_module_data . type . flag_word ] + = \
1 << planetary_orbital_module_data . number
# Upgrades from completed missions
if ctx . generic_upgrade_missions > 0 :
total_missions = sum ( len ( column ) for campaign in ctx . custom_mission_order for layout in campaign . layouts for column in layout . missions )
num_missions = int ( ( ctx . generic_upgrade_missions / 100 ) * total_missions )
completed = len ( [ mission_id for mission_id in ctx . mission_id_to_location_ids if ctx . is_mission_completed ( mission_id ) ] )
upgrade_count = min ( completed / / num_missions , ctx . max_upgrade_level ) if num_missions > 0 else ctx . max_upgrade_level
upgrade_count = min ( upgrade_count , WEAPON_ARMOR_UPGRADE_MAX_LEVEL )
# Equivalent to "Progressive Weapon/Armor Upgrade" item
global_upgrades : typing . Set [ str ] = upgrade_included_names [ GenericUpgradeItems . option_bundle_all ]
for global_upgrade in global_upgrades :
race = get_full_item_list ( ) [ global_upgrade ] . race
upgrade_flaggroup = race_to_item_type [ race ] [ " Upgrade " ] . flag_word
for bundled_number in get_bundle_upgrade_member_numbers ( global_upgrade ) :
accumulators [ race ] [ upgrade_flaggroup ] + = upgrade_count << bundled_number
return accumulators
def get_bundle_upgrade_member_numbers ( bundled_item : str ) - > typing . List [ int ] :
upgrade_elements : typing . List [ str ] = upgrade_bundles [ bundled_item ]
if bundled_item in ( item_names . PROGRESSIVE_PROTOSS_GROUND_UPGRADE , item_names . PROGRESSIVE_PROTOSS_AIR_UPGRADE ) :
# Shields are handled as a maximum of those two
upgrade_elements = [ item_name for item_name in upgrade_elements if item_name != item_names . PROGRESSIVE_PROTOSS_SHIELDS ]
return [ get_full_item_list ( ) [ item_name ] . number for item_name in upgrade_elements ]
def calc_difficulty ( difficulty : int ) :
if difficulty == 0 :
return ' C '
elif difficulty == 1 :
return ' N '
elif difficulty == 2 :
return ' H '
elif difficulty == 3 :
return ' B '
return ' X '
def get_kerrigan_level ( ctx : SC2Context , items : typing . Dict [ SC2Race , typing . List [ int ] ] , missions_beaten : int ) - > int :
item_value = items [ SC2Race . ZERG ] [ ZergItemType . Level . flag_word ]
mission_value = missions_beaten * ctx . kerrigan_levels_per_mission_completed
if ctx . kerrigan_levels_per_mission_completed_cap != - 1 :
mission_value = min ( mission_value , ctx . kerrigan_levels_per_mission_completed_cap )
total_value = item_value + mission_value
if ctx . kerrigan_total_level_cap != - 1 :
total_value = min ( total_value , ctx . kerrigan_total_level_cap )
return total_value
def calculate_kerrigan_options ( ctx : SC2Context ) - > int :
result = 0
# Bits 0, 1
# Kerrigan unit available
if ctx . kerrigan_presence in kerrigan_unit_available :
result | = 1 << 0
# Bit 2
# Kerrigan primal status by map
if ctx . kerrigan_primal_status == KerriganPrimalStatus . option_vanilla :
result | = 1 << 2
return result
def caclulate_soa_options ( ctx : SC2Context , mission : SC2Mission ) - > int :
"""
Pack SOA options into a single integer with bitflags .
0b000011 = SOA presence
0b000100 = SOA in no - builds
0b011000 = Passives presence
0b100000 = PAssives in no - builds
"""
result = 0
# Bits 0, 1
# SoA Calldowns available
soa_presence_value = 0
if is_mission_in_soa_presence ( ctx . spear_of_adun_presence , mission ) :
soa_presence_value = 3
result | = soa_presence_value << 0
# Bit 2
# SoA Calldowns for no-builds
if ctx . spear_of_adun_present_in_no_build == SpearOfAdunPresentInNoBuild . option_true :
result | = 1 << 2
# Bits 3,4
# Autocasts
soa_autocasts_presence_value = 0
if is_mission_in_soa_presence ( ctx . spear_of_adun_passive_ability_presence , mission , SpearOfAdunPassiveAbilityPresence ) :
soa_autocasts_presence_value = 3
# Guardian Shell breaks without SoA on version 4+, but can be generated without SoA on version 3
if ctx . slot_data_version < 4 and MissionFlag . Protoss in mission . flags :
soa_autocasts_presence_value = 3
result | = soa_autocasts_presence_value << 3
# Bit 5
# Autocasts in no-builds
if ctx . spear_of_adun_passive_present_in_no_build == SpearOfAdunPassivesPresentInNoBuild . option_true :
result | = 1 << 5
return result
def calculate_generic_upgrade_options ( ctx : SC2Context ) - > int :
result = 0
# Bits 0,1
# Research mode
research_mode_value = 0
if ctx . generic_upgrade_research == GenericUpgradeResearch . option_vanilla :
research_mode_value = 0
elif ctx . generic_upgrade_research == GenericUpgradeResearch . option_auto_in_no_build :
research_mode_value = 1
elif ctx . generic_upgrade_research == GenericUpgradeResearch . option_auto_in_build :
research_mode_value = 2
elif ctx . generic_upgrade_research == GenericUpgradeResearch . option_always_auto :
research_mode_value = 3
result | = research_mode_value << 0
# Bit 2
# Speedup
if ctx . generic_upgrade_research_speedup == GenericUpgradeResearchSpeedup . option_true :
result | = 1 << 2
return result
def calculate_trade_options ( ctx : SC2Context ) - > int :
result = 0
# Bit 0
# Trade enabled
if ctx . trade_enabled :
result | = 1 << 0
# Bit 1
# Workers allowed
if ctx . trade_workers_allowed == VoidTradeWorkers . option_true :
result | = 1 << 1
return result
def kerrigan_primal ( ctx : SC2Context , kerrigan_level : int ) - > bool :
if ctx . kerrigan_primal_status == KerriganPrimalStatus . option_always_zerg :
return True
elif ctx . kerrigan_primal_status == KerriganPrimalStatus . option_always_human :
return False
elif ctx . kerrigan_primal_status == KerriganPrimalStatus . option_level_35 :
return kerrigan_level > = 35
elif ctx . kerrigan_primal_status == KerriganPrimalStatus . option_half_completion :
total_missions = len ( ctx . mission_id_to_location_ids )
completed = sum ( ctx . is_mission_completed ( mission_id )
for mission_id in ctx . mission_id_to_location_ids )
return completed > = ( total_missions / 2 )
elif ctx . kerrigan_primal_status == KerriganPrimalStatus . option_item :
codes = [ item . item for item in ctx . items_received ]
return get_full_item_list ( ) [ item_names . KERRIGAN_PRIMAL_FORM ] . code in codes
return False
def get_mission_variant ( mission_id : int ) - > int :
mission_flags = lookup_id_to_mission [ mission_id ] . flags
if MissionFlag . RaceSwap not in mission_flags :
return 0
if MissionFlag . Terran in mission_flags :
return 1
elif MissionFlag . Zerg in mission_flags :
return 2
elif MissionFlag . Protoss in mission_flags :
return 3
return 0
def get_item_flag_word ( item_name : str ) - > int :
return get_full_item_list ( ) [ item_name ] . type . flag_word
async def starcraft_launch ( ctx : SC2Context , mission_id : int ) :
sc2_logger . info ( f " Launching { lookup_id_to_mission [ mission_id ] . mission_name } . If game does not launch check log file for errors. " )
with DllDirectory ( None ) :
run_game (
bot . maps . get ( lookup_id_to_mission [ mission_id ] . map_file ) ,
[ Bot ( Race . Terran , ArchipelagoBot ( ctx , mission_id ) , name = " Archipelago " , fullscreen = not SC2World . settings . game_windowed_mode ) ] ,
realtime = True ,
)
class ArchipelagoBot ( bot . bot_ai . BotAI ) :
__slots__ = [
' game_running ' ,
' mission_completed ' ,
' boni ' ,
' setup_done ' ,
' ctx ' ,
' mission_id ' ,
' want_close ' ,
' can_read_game ' ,
' last_received_update ' ,
' last_trade_cargo ' ,
' last_supply_used '
]
ctx : SC2Context
# defined in bot_ai_internal.py; seems to be mis-annotated as a float and later re-annotated as an int
supply_used : int
def __init__ ( self , ctx : SC2Context , mission_id : int ) :
self . game_running = False
self . mission_completed = False
self . want_close = False
self . can_read_game = False
self . last_received_update : int = 0
self . last_trade_cargo : set = set ( )
self . last_supply_used : int = 0
self . trade_reply_cooldown : int = 0
self . setup_done = False
self . ctx = ctx
self . ctx . last_bot = self
self . mission_id = mission_id
self . boni = [ False for _ in range ( MAX_BONUS ) ]
super ( ArchipelagoBot , self ) . __init__ ( )
async def on_step ( self , iteration : int ) :
if self . want_close :
self . want_close = False
await self . _client . leave ( )
return
game_state = 0
if not self . setup_done :
self . setup_done = True
mission = lookup_id_to_mission [ self . mission_id ]
start_items = calculate_items ( self . ctx )
missions_beaten = self . missions_beaten_count ( )
kerrigan_level = get_kerrigan_level ( self . ctx , start_items , missions_beaten )
kerrigan_options = calculate_kerrigan_options ( self . ctx )
soa_options = caclulate_soa_options ( self . ctx , mission )
generic_upgrade_options = calculate_generic_upgrade_options ( self . ctx )
trade_options = calculate_trade_options ( self . ctx )
mission_variant = get_mission_variant ( self . mission_id ) # 0/1/2/3 for unchanged/Terran/Zerg/Protoss
nova_fallback : bool
if MissionFlag . Nova in mission . flags :
nova_fallback = self . ctx . use_nova_nco_fallback
elif MissionFlag . WoLNova in mission . flags :
nova_fallback = self . ctx . use_nova_wol_fallback
else :
nova_fallback = False
uncollected_objectives : typing . List [ int ] = self . get_uncollected_objectives ( )
if self . ctx . difficulty_override > = 0 :
difficulty = calc_difficulty ( self . ctx . difficulty_override )
else :
difficulty = calc_difficulty ( self . ctx . difficulty )
if self . ctx . game_speed_override > = 0 :
game_speed = self . ctx . game_speed_override
else :
game_speed = self . ctx . game_speed
await self . chat_send (
" ?SetOptions "
f " { difficulty } "
f " { generic_upgrade_options } "
f " { self . ctx . all_in_choice } "
f " { game_speed } "
f " { self . ctx . disable_forced_camera } "
f " { self . ctx . skip_cutscenes } "
f " { kerrigan_options } "
f " { self . ctx . grant_story_tech } "
f " { self . ctx . take_over_ai_allies } "
f " { soa_options } "
f " { self . ctx . mission_order } "
f " { int ( nova_fallback ) } "
f " { self . ctx . grant_story_levels } "
f " { self . ctx . enable_morphling } "
f " { mission_variant } "
f " { trade_options } "
f " { self . ctx . difficulty_damage_modifier } "
f " { self . ctx . mercenary_highlanders } " # TODO: Possibly rework it into unit options in the next cycle
f " { self . ctx . war_council_nerfs } "
)
await self . update_resources ( start_items )
await self . update_terran_tech ( start_items )
await self . update_zerg_tech ( start_items , kerrigan_level )
await self . update_protoss_tech ( start_items )
await self . update_misc_tech ( start_items )
await self . update_colors ( )
if uncollected_objectives :
await self . chat_send ( " ?UncollectedLocations {} " . format (
functools . reduce ( lambda a , b : a + " " + b , [ str ( x ) for x in uncollected_objectives ] )
) )
await self . chat_send ( " ?LoadFinished " )
self . last_received_update = len ( self . ctx . items_received )
else :
if self . ctx . pending_color_update :
await self . update_colors ( )
if not self . ctx . announcements . empty ( ) :
message = self . ctx . announcements . get ( timeout = 1 )
await self . chat_send ( " ?SendMessage " + message )
self . ctx . announcements . task_done ( )
# Archipelago reads the health
controller1_state = 0
controller2_state = 0
for unit in self . all_own_units ( ) :
if unit . health_max == CONTROLLER_HEALTH :
controller1_state = int ( CONTROLLER_HEALTH - unit . health )
self . can_read_game = True
elif unit . health_max == CONTROLLER2_HEALTH :
controller2_state = int ( CONTROLLER2_HEALTH - unit . health )
self . can_read_game = True
elif unit . name == TRADE_UNIT :
# Handle Void Trade requests
# Check for orders (for buildings this is usually research or training)
if not unit . is_idle and not self . ctx . trade_underway :
button = unit . orders [ 0 ] . ability . button_name
if button == TRADE_SEND_BUTTON and len ( self . last_trade_cargo ) > 0 :
units_to_send : typing . List [ str ] = [ ]
non_ap_units : typing . Set [ str ] = set ( )
for passenger in self . last_trade_cargo :
# Alternatively passenger._type_data.name but passenger.name seems to always match
unit_name = passenger . name
if unit_name . startswith ( " AP_ " ) :
units_to_send . append ( normalized_unit_types . get ( unit_name , unit_name ) )
else :
non_ap_units . add ( unit_name )
if len ( non_ap_units ) > 0 :
sc2_logger . info ( f " Void Trade tried to send non-AP units: { ' , ' . join ( non_ap_units ) } " )
self . ctx . trade_response = " ?TradeFail Void Trade rejected: Trade contains invalid units. "
self . ctx . trade_underway = True
else :
self . ctx . trade_response = None
self . ctx . trade_underway = True
async_start ( self . ctx . trade_send ( units_to_send ) )
elif button == TRADE_RECEIVE_1_BUTTON :
self . ctx . trade_underway = True
if self . supply_used != self . last_supply_used :
self . ctx . trade_response = None
async_start ( self . ctx . trade_receive ( 1 ) )
else :
self . ctx . trade_response = " ?TradeFail Void Trade rejected: Not enough supply. "
elif button == TRADE_RECEIVE_5_BUTTON :
self . ctx . trade_underway = True
if self . supply_used != self . last_supply_used :
self . ctx . trade_response = None
async_start ( self . ctx . trade_receive ( 5 ) )
else :
self . ctx . trade_response = " ?TradeFail Void Trade rejected: Not enough supply. "
elif not unit . is_idle and self . trade_reply_cooldown > 0 :
self . trade_reply_cooldown - = 1
elif unit . is_idle and self . trade_reply_cooldown > 0 :
self . trade_reply_cooldown = 0
self . ctx . trade_response = None
self . ctx . trade_underway = False
else :
# The API returns no passengers for researching/training buildings,
# so we need to buffer the passengers each frame
self . last_trade_cargo = unit . passengers
# SC2 has no good means of detecting when a unit is queued while supply capped,
# so a supply buffer here is the best we can do
self . last_supply_used = self . supply_used
game_state = controller1_state + ( controller2_state << 15 )
if iteration == 160 and not game_state & 1 :
await self . chat_send ( " ?SendMessage Warning: Archipelago unable to connect or has lost connection to " +
" Starcraft 2 (This is likely a map issue) " )
if self . last_received_update < len ( self . ctx . items_received ) :
current_items = calculate_items ( self . ctx )
missions_beaten = self . missions_beaten_count ( )
kerrigan_level = get_kerrigan_level ( self . ctx , current_items , missions_beaten )
await self . update_resources ( current_items )
await self . update_terran_tech ( current_items )
await self . update_zerg_tech ( current_items , kerrigan_level )
await self . update_protoss_tech ( current_items )
await self . update_misc_tech ( current_items )
self . last_received_update = len ( self . ctx . items_received )
if game_state & 1 :
if not self . game_running :
print ( " Archipelago Connected " )
self . game_running = True
if self . can_read_game :
if game_state & ( 1 << 1 ) and not self . mission_completed :
victory_locations = [ get_location_id ( self . mission_id , 0 ) ]
send_victory = (
self . mission_id in self . ctx . final_mission_ids and
len ( self . ctx . final_locations ) == len ( self . ctx . checked_locations . union ( victory_locations ) . intersection ( self . ctx . final_locations ) )
)
# Old slots don't have locations on goal
if not send_victory or self . ctx . slot_data_version > = 4 :
sc2_logger . info ( " Mission Completed " )
location_ids = self . ctx . mission_id_to_location_ids [ self . mission_id ]
victory_locations + = sorted ( [
get_location_id ( self . mission_id , location_id )
for location_id in location_ids
if ( location_id % VICTORY_MODULO ) > = VICTORY_CACHE_OFFSET
] )
await self . ctx . send_msgs (
[ { " cmd " : ' LocationChecks ' ,
" locations " : victory_locations } ] )
self . mission_completed = True
if send_victory :
print ( " Game Complete " )
await self . ctx . send_msgs ( [ { " cmd " : ' StatusUpdate ' , " status " : ClientStatus . CLIENT_GOAL } ] )
self . mission_completed = True
self . ctx . finished_game = True
for x , completed in enumerate ( self . boni ) :
if not completed and game_state & ( 1 << ( x + 2 ) ) :
await self . ctx . send_msgs (
[ { " cmd " : ' LocationChecks ' ,
" locations " : [ get_location_id ( self . mission_id , x + 1 ) ] } ] )
self . boni [ x ] = True
# Send Void Trade results
if self . ctx . trade_response is not None and self . trade_reply_cooldown == 0 :
await self . chat_send ( self . ctx . trade_response )
# Wait an arbitrary amount of frames before trying again
self . trade_reply_cooldown = 60
else :
await self . chat_send ( " ?SendMessage LostConnection - Lost connection to game. " )
def get_uncollected_objectives ( self ) - > typing . List [ int ] :
result = [
location % VICTORY_MODULO
for location in self . ctx . uncollected_locations_in_mission ( lookup_id_to_mission [ self . mission_id ] )
if ( location % VICTORY_MODULO ) < VICTORY_CACHE_OFFSET
]
return result
def missions_beaten_count ( self ) - > int :
return len ( [ location for location in self . ctx . checked_locations if location % VICTORY_MODULO == 0 ] )
async def update_colors ( self ) :
await self . chat_send ( " ?SetColor rr " + str ( self . ctx . player_color_raynor ) )
await self . chat_send ( " ?SetColor ks " + str ( self . ctx . player_color_zerg ) )
await self . chat_send ( " ?SetColor pz " + str ( self . ctx . player_color_zerg_primal ) )
await self . chat_send ( " ?SetColor da " + str ( self . ctx . player_color_protoss ) )
await self . chat_send ( " ?SetColor nova " + str ( self . ctx . player_color_nova ) )
self . ctx . pending_color_update = False
async def update_resources ( self , current_items : typing . Dict [ SC2Race , typing . List [ int ] ] ) :
DEFAULT_MAX_SUPPLY = 200
max_supply_amount = max (
DEFAULT_MAX_SUPPLY
+ (
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . MAX_SUPPLY ) ]
* self . ctx . maximum_supply_per_item
)
- (
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . REDUCED_MAX_SUPPLY ) ]
* self . ctx . maximum_supply_reduction_per_item
) ,
self . ctx . lowest_maximum_supply ,
)
await self . chat_send ( " ?GiveResources {} {} {} {} " . format (
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . STARTING_MINERALS ) ] ,
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . STARTING_VESPENE ) ] ,
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . STARTING_SUPPLY ) ] ,
max_supply_amount - DEFAULT_MAX_SUPPLY ,
) )
async def update_terran_tech ( self , current_items : typing . Dict [ SC2Race , typing . List [ int ] ] ) :
terran_items = current_items [ SC2Race . TERRAN ]
await self . chat_send ( " ?GiveTerranTech " + " " . join ( map ( str , terran_items ) ) )
async def update_zerg_tech ( self , current_items : typing . Dict [ SC2Race , typing . List [ int ] ] , kerrigan_level : int ) :
zerg_items = current_items [ SC2Race . ZERG ]
zerg_items = [ value for index , value in enumerate ( zerg_items ) if index not in [ ZergItemType . Level . flag_word , ZergItemType . Primal_Form . flag_word ] ]
kerrigan_primal_by_items = kerrigan_primal ( self . ctx , kerrigan_level )
kerrigan_primal_bot_value = 1 if kerrigan_primal_by_items else 0
await self . chat_send ( f " ?GiveZergTech { kerrigan_level } { kerrigan_primal_bot_value } " + ' ' . join ( map ( str , zerg_items ) ) )
async def update_protoss_tech ( self , current_items : typing . Dict [ SC2Race , typing . List [ int ] ] ) :
protoss_items = current_items [ SC2Race . PROTOSS ]
await self . chat_send ( " ?GiveProtossTech " + " " . join ( map ( str , protoss_items ) ) )
async def update_misc_tech ( self , current_items : typing . Dict [ SC2Race , typing . List [ int ] ] ) :
await self . chat_send ( " ?GiveMiscTech {} {} {} " . format (
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . BUILDING_CONSTRUCTION_SPEED ) ] ,
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . UPGRADE_RESEARCH_SPEED ) ] ,
current_items [ SC2Race . ANY ] [ get_item_flag_word ( item_names . UPGRADE_RESEARCH_COST ) ] ,
) )
def calc_unfinished_nodes (
ctx : SC2Context
) - > typing . Tuple [ typing . List [ int ] , typing . Dict [ int , typing . List [ int ] ] , typing . List [ int ] , typing . Set [ int ] ] :
unfinished_missions : typing . Set [ int ] = set ( )
available_missions , available_layouts , available_campaigns = calc_available_nodes ( ctx )
for mission_id in available_missions :
objectives = set ( ctx . locations_for_mission_id ( mission_id ) )
if objectives :
objectives_completed = ctx . checked_locations & objectives
if len ( objectives_completed ) < len ( objectives ) :
unfinished_missions . add ( mission_id )
return available_missions , available_layouts , available_campaigns , unfinished_missions
def is_mission_available ( ctx : SC2Context , mission_id_to_check : int ) - > bool :
available_missions , _ , _ = calc_available_nodes ( ctx )
return mission_id_to_check in available_missions
def calc_available_nodes ( ctx : SC2Context ) - > typing . Tuple [ typing . List [ int ] , typing . Dict [ int , typing . List [ int ] ] , typing . List [ int ] ] :
beaten_missions : typing . Set [ int ] = { mission_id for mission_id in ctx . mission_id_to_entry_rules if ctx . is_mission_completed ( mission_id ) }
received_items = compute_received_items ( ctx )
mission_order_objects : typing . List [ MissionOrderObjectSlotData ] = [ ]
parent_objects : typing . List [ typing . List [ MissionOrderObjectSlotData ] ] = [ ]
for campaign in ctx . custom_mission_order :
mission_order_objects . append ( campaign )
parent_objects . append ( [ ] )
for layout in campaign . layouts :
mission_order_objects . append ( layout )
parent_objects . append ( [ campaign ] )
for column in layout . missions :
for mission in column :
if mission . mission_id == - 1 :
continue
mission_order_objects . append ( mission )
parent_objects . append ( [ campaign , layout ] )
candidate_accessible_objects : typing . List [ MissionOrderObjectSlotData ] = [
mission_order_object for mission_order_object in mission_order_objects
if mission_order_object . entry_rule . is_accessible ( beaten_missions , received_items )
]
accessible_objects : typing . List [ MissionOrderObjectSlotData ] = [ ]
while len ( candidate_accessible_objects ) > 0 :
accessible_missions : typing . List [ MissionSlotData ] = [ mission_order_object for mission_order_object in accessible_objects if isinstance ( mission_order_object , MissionSlotData ) ]
beaten_accessible_missions : typing . Set [ int ] = { mission . mission_id for mission in accessible_missions if mission . mission_id in beaten_missions }
accessible_objects_to_add : typing . List [ MissionOrderObjectSlotData ] = [ ]
for mission_order_object in candidate_accessible_objects :
if (
mission_order_object . entry_rule . is_accessible ( beaten_accessible_missions , received_items )
and all ( [
parent_object . entry_rule . is_accessible ( beaten_accessible_missions , received_items )
for parent_object in parent_objects [ mission_order_objects . index ( mission_order_object ) ]
] )
) :
accessible_objects_to_add . append ( mission_order_object )
if len ( accessible_objects_to_add ) > 0 :
accessible_objects . extend ( accessible_objects_to_add )
candidate_accessible_objects = [
mission_order_object for mission_order_object in candidate_accessible_objects
if mission_order_object not in accessible_objects_to_add
]
else :
break
accessible_missions : typing . List [ MissionSlotData ] = [ mission_order_object for mission_order_object in accessible_objects if isinstance ( mission_order_object , MissionSlotData ) ]
beaten_accessible_missions : typing . Set [ int ] = { mission . mission_id for mission in accessible_missions if mission . mission_id in beaten_missions }
for mission_order_object in mission_order_objects :
# re-generate tooltip accessibility
for sub_rule in mission_order_object . entry_rule . sub_rules :
sub_rule . was_accessible = False
mission_order_object . entry_rule . is_accessible ( beaten_accessible_missions , received_items )
available_missions : typing . List [ int ] = [
mission_order_object . mission_id for mission_order_object in accessible_objects
if isinstance ( mission_order_object , MissionSlotData )
]
available_campaign_objects : typing . List [ CampaignSlotData ] = [
mission_order_object for mission_order_object in accessible_objects
if isinstance ( mission_order_object , CampaignSlotData )
]
available_campaigns : typing . List [ int ] = [
campaign_idx for campaign_idx , campaign in enumerate ( ctx . custom_mission_order )
if campaign in available_campaign_objects
]
available_layout_objects : typing . List [ LayoutSlotData ] = [
mission_order_object for mission_order_object in accessible_objects
if isinstance ( mission_order_object , LayoutSlotData )
]
available_layouts : typing . Dict [ int , typing . List [ int ] ] = {
campaign_idx : [
layout_idx for layout_idx , layout in enumerate ( campaign . layouts ) if layout in available_layout_objects
]
for campaign_idx , campaign in enumerate ( ctx . custom_mission_order )
}
return available_missions , available_layouts , available_campaigns
def compute_received_items ( ctx : SC2Context ) - > typing . Counter [ int ] :
received_items : typing . Counter [ int ] = collections . Counter ( )
for network_item in ctx . items_received :
received_items [ network_item . item ] + = 1
return received_items
def check_game_install_path ( ) - > bool :
# First thing: go to the default location for ExecuteInfo.
# An exception for Windows is included because it's very difficult to find ~\Documents if the user moved it.
if is_windows :
# The next five lines of utterly inscrutable code are brought to you by copy-paste from Stack Overflow.
# https://stackoverflow.com/questions/6227590/finding-the-users-my-documents-path/30924555#
import ctypes . wintypes
CSIDL_PERSONAL = 5 # My Documents
SHGFP_TYPE_CURRENT = 0 # Get current, not default value
buf = ctypes . create_unicode_buffer ( ctypes . wintypes . MAX_PATH )
ctypes . windll . shell32 . SHGetFolderPathW ( None , CSIDL_PERSONAL , None , SHGFP_TYPE_CURRENT , buf )
documentspath : str = buf . value
einfo = str ( documentspath / Path ( " StarCraft II \\ ExecuteInfo.txt " ) )
else :
einfo = str ( bot . paths . get_home ( ) / Path ( bot . paths . USERPATH [ bot . paths . PF ] ) )
# Check if the file exists.
if os . path . isfile ( einfo ) :
# Open the file and read it, picking out the latest executable's path.
with open ( einfo ) as f :
content = f . read ( )
if content :
search_result = re . search ( r " = (.*)Versions " , content )
if not search_result :
sc2_logger . warning ( f " Found { einfo } , but it was empty. Run SC2 through the Blizzard launcher, "
" then try again. " )
return False
base = search_result . group ( 1 )
if os . path . exists ( base ) :
executable = bot . paths . latest_executeble ( Path ( base ) . expanduser ( ) / " Versions " )
# Finally, check the path for an actual executable.
# If we find one, great. Set up the SC2PATH.
if os . path . isfile ( executable ) :
sc2_logger . info ( f " Found an SC2 install at { base } ! " )
sc2_logger . debug ( f " Latest executable at { executable } . " )
os . environ [ " SC2PATH " ] = base
sc2_logger . debug ( f " SC2PATH set to { base } . " )
return True
else :
sc2_logger . warning ( f " We may have found an SC2 install at { base } , but couldn ' t find { executable } . " )
else :
sc2_logger . warning ( f " { einfo } pointed to { base } , but we could not find an SC2 install there. " )
else :
sc2_logger . warning ( f " Couldn ' t find { einfo } . Run SC2 through the Blizzard launcher, then try again. "
f " If that fails, please run /set_path with your SC2 install directory. " )
return False
def is_mod_installed_correctly ( ) - > bool :
""" Searches for all required files. """
if " SC2PATH " not in os . environ :
check_game_install_path ( )
sc2_path : str = os . environ [ " SC2PATH " ]
mapdir = sc2_path / Path ( ' Maps/ArchipelagoCampaign ' )
mods = [ " ArchipelagoCore " , " ArchipelagoPlayer " , " ArchipelagoPlayerSuper " , " ArchipelagoPatches " ,
" ArchipelagoTriggers " , " ArchipelagoPlayerWoL " , " ArchipelagoPlayerHotS " ,
" ArchipelagoPlayerLotV " , " ArchipelagoPlayerLotVPrologue " , " ArchipelagoPlayerNCO " ]
modfiles = [ sc2_path / Path ( " Mods/ " + mod + " .SC2Mod " ) for mod in mods ]
wol_required_maps : typing . List [ str ] = [ " WoL " + os . sep + mission . map_file + " .SC2Map " for mission in SC2Mission
if mission . campaign in ( SC2Campaign . WOL , SC2Campaign . PROPHECY ) ]
hots_required_maps : typing . List [ str ] = [ " HotS " + os . sep + mission . map_file + " .SC2Map " for mission in campaign_mission_table [ SC2Campaign . HOTS ] ]
lotv_required_maps : typing . List [ str ] = [ " LotV " + os . sep + mission . map_file + " .SC2Map " for mission in SC2Mission
if mission . campaign in ( SC2Campaign . LOTV , SC2Campaign . PROLOGUE , SC2Campaign . EPILOGUE ) ]
nco_required_maps : typing . List [ str ] = [ " NCO " + os . sep + mission . map_file + " .SC2Map " for mission in campaign_mission_table [ SC2Campaign . NCO ] ]
required_maps = wol_required_maps + hots_required_maps + lotv_required_maps + nco_required_maps
needs_files = False
# Check for maps.
missing_maps : typing . List [ str ] = [ ]
for mapfile in required_maps :
if not os . path . isfile ( mapdir / mapfile ) :
missing_maps . append ( mapfile )
if len ( missing_maps ) > = 19 :
sc2_logger . warning ( f " All map files missing from { mapdir } . " )
needs_files = True
elif len ( missing_maps ) > 0 :
for map in missing_maps :
sc2_logger . debug ( f " Missing { map } from { mapdir } . " )
sc2_logger . warning ( f " Missing { len ( missing_maps ) } map files. " )
needs_files = True
else : # Must be no maps missing
sc2_logger . debug ( f " All maps found in { mapdir } . " )
# Check for mods.
for modfile in modfiles :
if os . path . isfile ( modfile ) or os . path . isdir ( modfile ) :
sc2_logger . debug ( f " Archipelago mod found at { modfile } . " )
else :
sc2_logger . warning ( f " Archipelago mod could not be found at { modfile } . " )
needs_files = True
# Final verdict.
if needs_files :
sc2_logger . warning ( " Required files are missing. Run /download_data to acquire them. " )
return False
else :
sc2_logger . debug ( " All map/mod files are properly installed. " )
return True
class DllDirectory :
# Credit to Black Sliver for this code.
# More info: https://docs.microsoft.com/en-us/windows/win32/api/winbase/nf-winbase-setdlldirectoryw
_old : typing . Optional [ str ] = None
_new : typing . Optional [ str ] = None
def __init__ ( self , new : typing . Optional [ str ] ) :
self . _new = new
def __enter__ ( self ) :
old = self . get ( )
if self . set ( self . _new ) :
self . _old = old
def __exit__ ( self , * args ) :
if self . _old is not None :
self . set ( self . _old )
@staticmethod
def get ( ) - > typing . Optional [ str ] :
if sys . platform == " win32 " :
n = ctypes . windll . kernel32 . GetDllDirectoryW ( 0 , None )
buf = ctypes . create_unicode_buffer ( n )
ctypes . windll . kernel32 . GetDllDirectoryW ( n , buf )
return buf . value
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
return None
@staticmethod
def set ( s : typing . Optional [ str ] ) - > bool :
if sys . platform == " win32 " :
return ctypes . windll . kernel32 . SetDllDirectoryW ( s ) != 0
# NOTE: other OS may support os.environ["LD_LIBRARY_PATH"], but this fix is windows-specific
return False
def download_latest_release_zip (
owner : str ,
repo : str ,
api_version : str ,
metadata : typing . Optional [ str ] = None ,
force_download = False
) - > typing . Tuple [ str , typing . Optional [ str ] ] :
""" Downloads the latest release of a GitHub repo to the current directory as a .zip file. """
import requests
headers = { " Accept " : ' application/vnd.github.v3+json ' }
url = f " https://api.github.com/repos/ { owner } / { repo } /releases/tags/ { api_version } "
try :
r1 = requests . get ( url , headers = headers )
if r1 . status_code == 200 :
latest_metadata = r1 . json ( )
cleanup_downloaded_metadata ( latest_metadata )
latest_metadata = str ( latest_metadata )
# sc2_logger.info(f"Latest version: {latest_metadata}.")
else :
sc2_logger . warning ( f " Status code: { r1 . status_code } " )
sc2_logger . warning ( " Failed to reach GitHub. Could not find download link. " )
sc2_logger . warning ( f " text: { r1 . text } " )
return " " , metadata
if ( force_download is False ) and ( metadata == latest_metadata ) :
sc2_logger . info ( " Latest version already installed. " )
return " " , metadata
sc2_logger . info ( f " Attempting to download latest version of API version { api_version } of { repo } . " )
download_url = r1 . json ( ) [ " assets " ] [ 0 ] [ " browser_download_url " ]
r2 = requests . get ( download_url , headers = headers )
if r2 . status_code == 200 and zipfile . is_zipfile ( io . BytesIO ( r2 . content ) ) :
tempdir = tempfile . gettempdir ( )
file = tempdir + os . sep + f " { repo } .zip "
with open ( file , " wb " ) as fh :
fh . write ( r2 . content )
sc2_logger . info ( f " Successfully downloaded { repo } .zip. Installing... " )
return file , latest_metadata
else :
sc2_logger . warning ( f " Status code: { r2 . status_code } " )
sc2_logger . warning ( " Download failed. " )
sc2_logger . warning ( f " text: { r2 . text } " )
return " " , metadata
except requests . ConnectionError :
sc2_logger . warning ( " Failed to reach GitHub. Could not find download link. " )
return " " , metadata
def cleanup_downloaded_metadata ( medatada_json : dict ) - > None :
for asset in medatada_json [ ' assets ' ] :
del asset [ ' download_count ' ]
def is_mod_update_available ( owner : str , repo : str , api_version : str , metadata : str ) - > bool :
import requests
headers = { " Accept " : ' application/vnd.github.v3+json ' }
url = f " https://api.github.com/repos/ { owner } / { repo } /releases/tags/ { api_version } "
try :
r1 = requests . get ( url , headers = headers )
if r1 . status_code == 200 :
latest_metadata = r1 . json ( )
cleanup_downloaded_metadata ( latest_metadata )
latest_metadata = str ( latest_metadata )
if metadata != latest_metadata :
return True
else :
return False
else :
sc2_logger . warning ( " Failed to reach GitHub while checking for updates. " )
sc2_logger . warning ( f " Status code: { r1 . status_code } " )
sc2_logger . warning ( f " text: { r1 . text } " )
return False
except requests . ConnectionError :
sc2_logger . warning ( " Failed to reach GitHub while checking for updates. " )
return False
def get_location_offset ( mission_id : int ) - > int :
return SC2WOL_LOC_ID_OFFSET if mission_id < = SC2Mission . ALL_IN . id \
else ( SC2HOTS_LOC_ID_OFFSET - SC2Mission . ALL_IN . id * VICTORY_MODULO )
def get_location_id ( mission_id : int , objective_id : int ) - > int :
return get_location_offset ( mission_id ) + mission_id * VICTORY_MODULO + objective_id
_has_forced_save = False
def force_settings_save_on_close ( ) - > None :
"""
Settings has an existing auto - save feature , but it only triggers if a new key was introduced .
Force it to mark things as changed by introducing a new key and then cleaning up .
"""
global _has_forced_save
if _has_forced_save :
return
SC2World . settings . update ( { ' invalid_attribute ' : True } )
del SC2World . settings . invalid_attribute
_has_forced_save = True
2025-09-30 09:34:26 -07:00
def launch ( * args : str ) :
2025-09-02 17:40:58 +02:00
colorama . just_fix_windows_console ( )
2025-09-30 09:34:26 -07:00
asyncio . run ( main ( args ) )
2025-09-02 17:40:58 +02:00
colorama . deinit ( )