mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00
some typing and cleaning, mostly in Fill.py (#349)
* some typing and cleaning, mostly in Fill.py * address missing Option types * resolve a few TODOs discussed in pull request
This commit is contained in:
117
Fill.py
117
Fill.py
@@ -13,7 +13,7 @@ class FillError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item] = tuple()):
|
||||
def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item] = tuple()) -> CollectionState:
|
||||
new_state = base_state.copy()
|
||||
for item in itempool:
|
||||
new_state.collect(item, True)
|
||||
@@ -22,12 +22,12 @@ def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item]
|
||||
|
||||
|
||||
def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations: typing.List[Location],
|
||||
itempool: typing.List[Item], single_player_placement=False, lock=False):
|
||||
unplaced_items = []
|
||||
itempool: typing.List[Item], single_player_placement: bool = False, lock: bool = False) -> None:
|
||||
unplaced_items: typing.List[Item] = []
|
||||
placements: typing.List[Location] = []
|
||||
|
||||
swapped_items = Counter()
|
||||
reachable_items: typing.Dict[int, deque] = {}
|
||||
swapped_items: typing.Counter[typing.Tuple[int, str]] = Counter()
|
||||
reachable_items: typing.Dict[int, typing.Deque[Item]] = {}
|
||||
for item in itempool:
|
||||
reachable_items.setdefault(item.player, deque()).append(item)
|
||||
|
||||
@@ -46,7 +46,8 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
|
||||
spot_to_fill: typing.Optional[Location] = None
|
||||
if world.accessibility[item_to_place.player] == 'minimal':
|
||||
perform_access_check = not world.has_beaten_game(maximum_exploration_state,
|
||||
item_to_place.player) if single_player_placement else not has_beaten_game
|
||||
item_to_place.player) \
|
||||
if single_player_placement else not has_beaten_game
|
||||
else:
|
||||
perform_access_check = True
|
||||
|
||||
@@ -127,18 +128,18 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
|
||||
itempool.extend(unplaced_items)
|
||||
|
||||
|
||||
def distribute_items_restrictive(world: MultiWorld):
|
||||
def distribute_items_restrictive(world: MultiWorld) -> None:
|
||||
fill_locations = sorted(world.get_unfilled_locations())
|
||||
world.random.shuffle(fill_locations)
|
||||
|
||||
# get items to distribute
|
||||
itempool = sorted(world.itempool)
|
||||
world.random.shuffle(itempool)
|
||||
progitempool = []
|
||||
nonexcludeditempool = []
|
||||
localrestitempool = {player: [] for player in range(1, world.players + 1)}
|
||||
nonlocalrestitempool = []
|
||||
restitempool = []
|
||||
progitempool: typing.List[Item] = []
|
||||
nonexcludeditempool: typing.List[Item] = []
|
||||
localrestitempool: typing.Dict[int, typing.List[Item]] = {player: [] for player in range(1, world.players + 1)}
|
||||
nonlocalrestitempool: typing.List[Item] = []
|
||||
restitempool: typing.List[Item] = []
|
||||
|
||||
for item in itempool:
|
||||
if item.advancement:
|
||||
@@ -188,7 +189,7 @@ def distribute_items_restrictive(world: MultiWorld):
|
||||
world.random.shuffle(defaultlocations)
|
||||
|
||||
if any(localrestitempool.values()): # we need to make sure some fills are limited to certain worlds
|
||||
local_locations = {player: [] for player in world.player_ids}
|
||||
local_locations: typing.Dict[int, typing.List[Location]] = {player: [] for player in world.player_ids}
|
||||
for location in defaultlocations:
|
||||
local_locations[location.player].append(location)
|
||||
for player_locations in local_locations.values():
|
||||
@@ -232,15 +233,16 @@ def distribute_items_restrictive(world: MultiWorld):
|
||||
logging.info(f'Per-Player counts: {print_data})')
|
||||
|
||||
|
||||
def fast_fill(world: MultiWorld, item_pool: typing.List, fill_locations: typing.List) -> typing.Tuple[
|
||||
typing.List, typing.List]:
|
||||
def fast_fill(world: MultiWorld,
|
||||
item_pool: typing.List[Item],
|
||||
fill_locations: typing.List[Location]) -> typing.Tuple[typing.List[Item], typing.List[Location]]:
|
||||
placing = min(len(item_pool), len(fill_locations))
|
||||
for item, location in zip(item_pool, fill_locations):
|
||||
world.push_item(location, item, False)
|
||||
return item_pool[placing:], fill_locations[placing:]
|
||||
|
||||
|
||||
def flood_items(world: MultiWorld):
|
||||
def flood_items(world: MultiWorld) -> None:
|
||||
# get items to distribute
|
||||
world.random.shuffle(world.itempool)
|
||||
itempool = world.itempool
|
||||
@@ -279,7 +281,8 @@ def flood_items(world: MultiWorld):
|
||||
item_to_place = item
|
||||
break
|
||||
|
||||
# we might be in a situation where all new locations require multiple items to reach. If that is the case, just place any advancement item we've found and continue trying
|
||||
# we might be in a situation where all new locations require multiple items to reach.
|
||||
# If that is the case, just place any advancement item we've found and continue trying
|
||||
if item_to_place is None:
|
||||
if candidate_item_to_place is not None:
|
||||
item_to_place = candidate_item_to_place
|
||||
@@ -300,7 +303,7 @@ def flood_items(world: MultiWorld):
|
||||
break
|
||||
|
||||
|
||||
def balance_multiworld_progression(world: MultiWorld):
|
||||
def balance_multiworld_progression(world: MultiWorld) -> None:
|
||||
# A system to reduce situations where players have no checks remaining, popularly known as "BK mode."
|
||||
# Overall progression balancing algorithm:
|
||||
# Gather up all locations in a sphere.
|
||||
@@ -313,24 +316,30 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
else:
|
||||
logging.info(f'Balancing multiworld progression for {len(balanceable_players)} Players.')
|
||||
state = CollectionState(world)
|
||||
checked_locations = set()
|
||||
checked_locations: typing.Set[Location] = set()
|
||||
unchecked_locations = set(world.get_locations())
|
||||
|
||||
reachable_locations_count = {player: 0 for player in world.player_ids if len(world.get_filled_locations(player)) != 0}
|
||||
reachable_locations_count = {
|
||||
player: 0
|
||||
for player in world.player_ids
|
||||
if len(world.get_filled_locations(player)) != 0
|
||||
}
|
||||
total_locations_count = Counter(location.player for location in world.get_locations() if not location.locked)
|
||||
balanceable_players = {player for player in balanceable_players if total_locations_count[player]}
|
||||
sphere_num = 1
|
||||
moved_item_count = 0
|
||||
|
||||
def get_sphere_locations(sphere_state, locations):
|
||||
def get_sphere_locations(sphere_state: CollectionState,
|
||||
locations: typing.Set[Location]) -> typing.Set[Location]:
|
||||
sphere_state.sweep_for_events(key_only=True, locations=locations)
|
||||
return {loc for loc in locations if sphere_state.can_reach(loc)}
|
||||
|
||||
def item_percentage(player, num):
|
||||
def item_percentage(player: int, num: int) -> float:
|
||||
return num / total_locations_count[player]
|
||||
|
||||
while True:
|
||||
# Gather non-locked locations. This ensures that only shuffled locations get counted for progression balancing,
|
||||
# Gather non-locked locations.
|
||||
# This ensures that only shuffled locations get counted for progression balancing,
|
||||
# i.e. the items the players will be checking.
|
||||
sphere_locations = get_sphere_locations(state, unchecked_locations)
|
||||
for location in sphere_locations:
|
||||
@@ -340,12 +349,18 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
|
||||
logging.debug(f"Sphere {sphere_num}")
|
||||
logging.debug(f"Reachable locations: {reachable_locations_count}")
|
||||
logging.debug(f"Reachable percentages: { {player: round(item_percentage(player, num), 2) for player, num in reachable_locations_count.items()} }\n")
|
||||
debug_percentages = {
|
||||
player: round(item_percentage(player, num), 2)
|
||||
for player, num in reachable_locations_count.items()
|
||||
}
|
||||
logging.debug(f"Reachable percentages: {debug_percentages}\n")
|
||||
sphere_num += 1
|
||||
|
||||
if checked_locations:
|
||||
# The 10% threshold can be modified for "progression balancing strength" -- right now it approximates the old 20/216 bound.
|
||||
threshold_percentage = max(map(lambda p: item_percentage(p, reachable_locations_count[p]), reachable_locations_count)) - 0.10
|
||||
# The 10% threshold can be modified for "progression balancing strength"
|
||||
# right now it approximates the old 20/216 bound.
|
||||
threshold_percentage = max(map(lambda p: item_percentage(p, reachable_locations_count[p]),
|
||||
reachable_locations_count)) - 0.10
|
||||
logging.debug(f"Threshold: {threshold_percentage}")
|
||||
balancing_players = {player for player, reachables in reachable_locations_count.items() if
|
||||
item_percentage(player, reachables) < threshold_percentage and player in balanceable_players}
|
||||
@@ -354,7 +369,7 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
balancing_unchecked_locations = unchecked_locations.copy()
|
||||
balancing_reachables = reachable_locations_count.copy()
|
||||
balancing_sphere = sphere_locations.copy()
|
||||
candidate_items = collections.defaultdict(set)
|
||||
candidate_items: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set)
|
||||
while True:
|
||||
# Check locations in the current sphere and gather progression items to swap earlier
|
||||
for location in balancing_sphere:
|
||||
@@ -380,19 +395,21 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
elif not balancing_sphere:
|
||||
raise RuntimeError('Not all required items reachable. Something went terribly wrong here.')
|
||||
# Gather a set of locations which we can swap items into
|
||||
unlocked_locations = collections.defaultdict(set)
|
||||
unlocked_locations: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set)
|
||||
for l in unchecked_locations:
|
||||
if l not in balancing_unchecked_locations:
|
||||
unlocked_locations[l.player].add(l)
|
||||
items_to_replace = []
|
||||
items_to_replace: typing.List[Location] = []
|
||||
for player in balancing_players:
|
||||
locations_to_test = unlocked_locations[player]
|
||||
items_to_test = candidate_items[player]
|
||||
while items_to_test:
|
||||
testing = items_to_test.pop()
|
||||
reducing_state = state.copy()
|
||||
for location in itertools.chain((l for l in items_to_replace if l.item.player == player),
|
||||
items_to_test):
|
||||
for location in itertools.chain((
|
||||
l for l in items_to_replace
|
||||
if l.item.player == player
|
||||
), items_to_test):
|
||||
reducing_state.collect(location.item, True, location)
|
||||
|
||||
reducing_state.sweep_for_events(locations=locations_to_test)
|
||||
@@ -402,7 +419,8 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
items_to_replace.append(testing)
|
||||
else:
|
||||
reduced_sphere = get_sphere_locations(reducing_state, locations_to_test)
|
||||
if item_percentage(player, reachable_locations_count[player] + len(reduced_sphere)) < threshold_percentage:
|
||||
p = item_percentage(player, reachable_locations_count[player] + len(reduced_sphere))
|
||||
if p < threshold_percentage:
|
||||
items_to_replace.append(testing)
|
||||
|
||||
replaced_items = False
|
||||
@@ -452,7 +470,7 @@ def balance_multiworld_progression(world: MultiWorld):
|
||||
break
|
||||
|
||||
|
||||
def swap_location_item(location_1: Location, location_2: Location, check_locked=True):
|
||||
def swap_location_item(location_1: Location, location_2: Location, check_locked: bool = True) -> None:
|
||||
"""Swaps Items of locations. Does NOT swap flags like shop_slot or locked, but does swap event"""
|
||||
if check_locked:
|
||||
if location_1.locked:
|
||||
@@ -465,14 +483,14 @@ def swap_location_item(location_1: Location, location_2: Location, check_locked=
|
||||
location_1.event, location_2.event = location_2.event, location_1.event
|
||||
|
||||
|
||||
def distribute_planned(world: MultiWorld):
|
||||
def warn(warning: str, force):
|
||||
def distribute_planned(world: MultiWorld) -> None:
|
||||
def warn(warning: str, force: typing.Union[bool, str]) -> None:
|
||||
if force in [True, 'fail', 'failure', 'none', False, 'warn', 'warning']:
|
||||
logging.warning(f'{warning}')
|
||||
else:
|
||||
logging.debug(f'{warning}')
|
||||
|
||||
def failed(warning: str, force):
|
||||
def failed(warning: str, force: typing.Union[bool, str]) -> None:
|
||||
if force in [True, 'fail', 'failure']:
|
||||
raise Exception(warning)
|
||||
else:
|
||||
@@ -482,7 +500,8 @@ def distribute_planned(world: MultiWorld):
|
||||
from worlds.alttp.Regions import key_drop_data
|
||||
world_name_lookup = world.world_name_lookup
|
||||
|
||||
plando_blocks = []
|
||||
block_value = typing.Union[typing.List[str], typing.Dict[str, typing.Any], str]
|
||||
plando_blocks: typing.List[typing.Dict[str, typing.Any]] = []
|
||||
player_ids = set(world.player_ids)
|
||||
for player in player_ids:
|
||||
for block in world.plando_items[player]:
|
||||
@@ -493,7 +512,7 @@ def distribute_planned(world: MultiWorld):
|
||||
block['from_pool'] = True
|
||||
if 'world' not in block:
|
||||
block['world'] = False
|
||||
items = []
|
||||
items: block_value = []
|
||||
if "items" in block:
|
||||
items = block["items"]
|
||||
if 'count' not in block:
|
||||
@@ -506,7 +525,7 @@ def distribute_planned(world: MultiWorld):
|
||||
failed("You must specify at least one item to place items with plando.", block['force'])
|
||||
continue
|
||||
if isinstance(items, dict):
|
||||
item_list = []
|
||||
item_list: typing.List[str] = []
|
||||
for key, value in items.items():
|
||||
if value is True:
|
||||
value = world.itempool.count(world.worlds[player].create_item(key))
|
||||
@@ -516,7 +535,7 @@ def distribute_planned(world: MultiWorld):
|
||||
items = [items]
|
||||
block['items'] = items
|
||||
|
||||
locations = []
|
||||
locations: block_value = []
|
||||
if 'location' in block:
|
||||
locations = block['location'] # just allow 'location' to keep old yamls compatible
|
||||
elif 'locations' in block:
|
||||
@@ -529,8 +548,6 @@ def distribute_planned(world: MultiWorld):
|
||||
for key, value in locations.items():
|
||||
location_list += [key] * value
|
||||
locations = location_list
|
||||
if isinstance(locations, str):
|
||||
locations = [locations]
|
||||
block['locations'] = locations
|
||||
|
||||
if not block['count']:
|
||||
@@ -572,20 +589,19 @@ def distribute_planned(world: MultiWorld):
|
||||
maxcount = placement['count']['target']
|
||||
from_pool = placement['from_pool']
|
||||
if target_world is False or world.players == 1: # target own world
|
||||
worlds = {player}
|
||||
worlds: typing.Set[int] = {player}
|
||||
elif target_world is True: # target any worlds besides own
|
||||
worlds = set(world.player_ids) - {player}
|
||||
elif target_world is None: # target all worlds
|
||||
worlds = set(world.player_ids)
|
||||
elif type(target_world) == list: # list of target worlds
|
||||
worlds = []
|
||||
worlds = set()
|
||||
for listed_world in target_world:
|
||||
if listed_world not in world_name_lookup:
|
||||
failed(f"Cannot place item to {target_world}'s world as that world does not exist.",
|
||||
placement['force'])
|
||||
continue
|
||||
worlds.append(world_name_lookup[listed_world])
|
||||
worlds = set(worlds)
|
||||
worlds.add(world_name_lookup[listed_world])
|
||||
elif type(target_world) == int: # target world by slot number
|
||||
if target_world not in range(1, world.players + 1):
|
||||
failed(
|
||||
@@ -605,8 +621,8 @@ def distribute_planned(world: MultiWorld):
|
||||
world.random.shuffle(candidates)
|
||||
world.random.shuffle(items)
|
||||
count = 0
|
||||
err = []
|
||||
successful_pairs = []
|
||||
err: typing.List[str] = []
|
||||
successful_pairs: typing.List[typing.Tuple[Item, Location]] = []
|
||||
for item_name in items:
|
||||
item = world.worlds[player].create_item(item_name)
|
||||
for location in reversed(candidates):
|
||||
@@ -617,7 +633,7 @@ def distribute_planned(world: MultiWorld):
|
||||
if not location.item:
|
||||
if location.item_rule(item):
|
||||
if location.can_fill(world.state, item, False):
|
||||
successful_pairs.append([item, location])
|
||||
successful_pairs.append((item, location))
|
||||
candidates.remove(location)
|
||||
count = count + 1
|
||||
break
|
||||
@@ -630,10 +646,9 @@ def distribute_planned(world: MultiWorld):
|
||||
if count == maxcount:
|
||||
break
|
||||
if count < placement['count']['min']:
|
||||
err = " ".join(err)
|
||||
m = placement['count']['min']
|
||||
failed(
|
||||
f"Plando block failed to place {m - count} of {m} item(s) for {world.player_name[player]}, error(s): {err}",
|
||||
f"Plando block failed to place {m - count} of {m} item(s) for {world.player_name[player]}, error(s): {' '.join(err)}",
|
||||
placement['force'])
|
||||
for (item, location) in successful_pairs:
|
||||
world.push_item(location, item, collect=False)
|
||||
|
Reference in New Issue
Block a user