Core: Purge the evil (world: MultiWorld) (#2749)

* Purge the evil

* Some files didn't save

* Fix a couple of missed string references

* multi_world -> multiworld
This commit is contained in:
PoryGone
2024-02-04 18:38:00 -05:00
committed by GitHub
parent 6c19bc42bb
commit 281fe01c25
7 changed files with 429 additions and 429 deletions

186
Fill.py
View File

@@ -27,12 +27,12 @@ def sweep_from_pool(base_state: CollectionState, itempool: typing.Sequence[Item]
return new_state
def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations: typing.List[Location],
def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locations: typing.List[Location],
item_pool: typing.List[Item], single_player_placement: bool = False, lock: bool = False,
swap: bool = True, on_place: typing.Optional[typing.Callable[[Location], None]] = None,
allow_partial: bool = False, allow_excluded: bool = False, name: str = "Unknown") -> None:
"""
:param world: Multiworld to be filled.
:param multiworld: Multiworld to be filled.
:param base_state: State assumed before fill.
:param locations: Locations to be filled with item_pool
:param item_pool: Items to fill into the locations
@@ -68,7 +68,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
maximum_exploration_state = sweep_from_pool(
base_state, item_pool + unplaced_items)
has_beaten_game = world.has_beaten_game(maximum_exploration_state)
has_beaten_game = multiworld.has_beaten_game(maximum_exploration_state)
while items_to_place:
# if we have run out of locations to fill,break out of this loop
@@ -80,8 +80,8 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
spot_to_fill: typing.Optional[Location] = None
# if minimal accessibility, only check whether location is reachable if game not beatable
if world.worlds[item_to_place.player].options.accessibility == Accessibility.option_minimal:
perform_access_check = not world.has_beaten_game(maximum_exploration_state,
if multiworld.worlds[item_to_place.player].options.accessibility == Accessibility.option_minimal:
perform_access_check = not multiworld.has_beaten_game(maximum_exploration_state,
item_to_place.player) \
if single_player_placement else not has_beaten_game
else:
@@ -122,11 +122,11 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
# Verify placing this item won't reduce available locations, which would be a useless swap.
prev_state = swap_state.copy()
prev_loc_count = len(
world.get_reachable_locations(prev_state))
multiworld.get_reachable_locations(prev_state))
swap_state.collect(item_to_place, True)
new_loc_count = len(
world.get_reachable_locations(swap_state))
multiworld.get_reachable_locations(swap_state))
if new_loc_count >= prev_loc_count:
# Add this item to the existing placement, and
@@ -156,7 +156,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
else:
unplaced_items.append(item_to_place)
continue
world.push_item(spot_to_fill, item_to_place, False)
multiworld.push_item(spot_to_fill, item_to_place, False)
spot_to_fill.locked = lock
placements.append(spot_to_fill)
spot_to_fill.event = item_to_place.advancement
@@ -173,7 +173,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
# validate all placements and remove invalid ones
state = sweep_from_pool(base_state, [])
for placement in placements:
if world.accessibility[placement.item.player] != "minimal" and not placement.can_reach(state):
if multiworld.accessibility[placement.item.player] != "minimal" and not placement.can_reach(state):
placement.item.location = None
unplaced_items.append(placement.item)
placement.item = None
@@ -188,7 +188,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
if excluded_locations:
for location in excluded_locations:
location.progress_type = location.progress_type.DEFAULT
fill_restrictive(world, base_state, excluded_locations, unplaced_items, single_player_placement, lock,
fill_restrictive(multiworld, base_state, excluded_locations, unplaced_items, single_player_placement, lock,
swap, on_place, allow_partial, False)
for location in excluded_locations:
if not location.item:
@@ -196,7 +196,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
if not allow_partial and len(unplaced_items) > 0 and len(locations) > 0:
# There are leftover unplaceable items and locations that won't accept them
if world.can_beat_game():
if multiworld.can_beat_game():
logging.warning(
f'Not all items placed. Game beatable anyway. (Could not place {unplaced_items})')
else:
@@ -206,7 +206,7 @@ def fill_restrictive(world: MultiWorld, base_state: CollectionState, locations:
item_pool.extend(unplaced_items)
def remaining_fill(world: MultiWorld,
def remaining_fill(multiworld: MultiWorld,
locations: typing.List[Location],
itempool: typing.List[Item]) -> None:
unplaced_items: typing.List[Item] = []
@@ -261,7 +261,7 @@ def remaining_fill(world: MultiWorld,
unplaced_items.append(item_to_place)
continue
world.push_item(spot_to_fill, item_to_place, False)
multiworld.push_item(spot_to_fill, item_to_place, False)
placements.append(spot_to_fill)
placed += 1
if not placed % 1000:
@@ -278,19 +278,19 @@ def remaining_fill(world: MultiWorld,
itempool.extend(unplaced_items)
def fast_fill(world: MultiWorld,
def fast_fill(multiworld: MultiWorld,
item_pool: typing.List[Item],
fill_locations: typing.List[Location]) -> typing.Tuple[typing.List[Item], typing.List[Location]]:
placing = min(len(item_pool), len(fill_locations))
for item, location in zip(item_pool, fill_locations):
world.push_item(location, item, False)
multiworld.push_item(location, item, False)
return item_pool[placing:], fill_locations[placing:]
def accessibility_corrections(world: MultiWorld, state: CollectionState, locations, pool=[]):
def accessibility_corrections(multiworld: MultiWorld, state: CollectionState, locations, pool=[]):
maximum_exploration_state = sweep_from_pool(state, pool)
minimal_players = {player for player in world.player_ids if world.worlds[player].options.accessibility == "minimal"}
unreachable_locations = [location for location in world.get_locations() if location.player in minimal_players and
minimal_players = {player for player in multiworld.player_ids if multiworld.worlds[player].options.accessibility == "minimal"}
unreachable_locations = [location for location in multiworld.get_locations() if location.player in minimal_players and
not location.can_reach(maximum_exploration_state)]
for location in unreachable_locations:
if (location.item is not None and location.item.advancement and location.address is not None and not
@@ -304,36 +304,36 @@ def accessibility_corrections(world: MultiWorld, state: CollectionState, locatio
locations.append(location)
if pool and locations:
locations.sort(key=lambda loc: loc.progress_type != LocationProgressType.PRIORITY)
fill_restrictive(world, state, locations, pool, name="Accessibility Corrections")
fill_restrictive(multiworld, state, locations, pool, name="Accessibility Corrections")
def inaccessible_location_rules(world: MultiWorld, state: CollectionState, locations):
def inaccessible_location_rules(multiworld: MultiWorld, state: CollectionState, locations):
maximum_exploration_state = sweep_from_pool(state)
unreachable_locations = [location for location in locations if not location.can_reach(maximum_exploration_state)]
if unreachable_locations:
def forbid_important_item_rule(item: Item):
return not ((item.classification & 0b0011) and world.worlds[item.player].options.accessibility != 'minimal')
return not ((item.classification & 0b0011) and multiworld.worlds[item.player].options.accessibility != 'minimal')
for location in unreachable_locations:
add_item_rule(location, forbid_important_item_rule)
def distribute_early_items(world: MultiWorld,
def distribute_early_items(multiworld: MultiWorld,
fill_locations: typing.List[Location],
itempool: typing.List[Item]) -> typing.Tuple[typing.List[Location], typing.List[Item]]:
""" returns new fill_locations and itempool """
early_items_count: typing.Dict[typing.Tuple[str, int], typing.List[int]] = {}
for player in world.player_ids:
items = itertools.chain(world.early_items[player], world.local_early_items[player])
for player in multiworld.player_ids:
items = itertools.chain(multiworld.early_items[player], multiworld.local_early_items[player])
for item in items:
early_items_count[item, player] = [world.early_items[player].get(item, 0),
world.local_early_items[player].get(item, 0)]
early_items_count[item, player] = [multiworld.early_items[player].get(item, 0),
multiworld.local_early_items[player].get(item, 0)]
if early_items_count:
early_locations: typing.List[Location] = []
early_priority_locations: typing.List[Location] = []
loc_indexes_to_remove: typing.Set[int] = set()
base_state = world.state.copy()
base_state.sweep_for_events(locations=(loc for loc in world.get_filled_locations() if loc.address is None))
base_state = multiworld.state.copy()
base_state.sweep_for_events(locations=(loc for loc in multiworld.get_filled_locations() if loc.address is None))
for i, loc in enumerate(fill_locations):
if loc.can_reach(base_state):
if loc.progress_type == LocationProgressType.PRIORITY:
@@ -345,8 +345,8 @@ def distribute_early_items(world: MultiWorld,
early_prog_items: typing.List[Item] = []
early_rest_items: typing.List[Item] = []
early_local_prog_items: typing.Dict[int, typing.List[Item]] = {player: [] for player in world.player_ids}
early_local_rest_items: typing.Dict[int, typing.List[Item]] = {player: [] for player in world.player_ids}
early_local_prog_items: typing.Dict[int, typing.List[Item]] = {player: [] for player in multiworld.player_ids}
early_local_rest_items: typing.Dict[int, typing.List[Item]] = {player: [] for player in multiworld.player_ids}
item_indexes_to_remove: typing.Set[int] = set()
for i, item in enumerate(itempool):
if (item.name, item.player) in early_items_count:
@@ -370,28 +370,28 @@ def distribute_early_items(world: MultiWorld,
if len(early_items_count) == 0:
break
itempool = [item for i, item in enumerate(itempool) if i not in item_indexes_to_remove]
for player in world.player_ids:
for player in multiworld.player_ids:
player_local = early_local_rest_items[player]
fill_restrictive(world, base_state,
fill_restrictive(multiworld, base_state,
[loc for loc in early_locations if loc.player == player],
player_local, lock=True, allow_partial=True, name=f"Local Early Items P{player}")
if player_local:
logging.warning(f"Could not fulfill rules of early items: {player_local}")
early_rest_items.extend(early_local_rest_items[player])
early_locations = [loc for loc in early_locations if not loc.item]
fill_restrictive(world, base_state, early_locations, early_rest_items, lock=True, allow_partial=True,
fill_restrictive(multiworld, base_state, early_locations, early_rest_items, lock=True, allow_partial=True,
name="Early Items")
early_locations += early_priority_locations
for player in world.player_ids:
for player in multiworld.player_ids:
player_local = early_local_prog_items[player]
fill_restrictive(world, base_state,
fill_restrictive(multiworld, base_state,
[loc for loc in early_locations if loc.player == player],
player_local, lock=True, allow_partial=True, name=f"Local Early Progression P{player}")
if player_local:
logging.warning(f"Could not fulfill rules of early items: {player_local}")
early_prog_items.extend(player_local)
early_locations = [loc for loc in early_locations if not loc.item]
fill_restrictive(world, base_state, early_locations, early_prog_items, lock=True, allow_partial=True,
fill_restrictive(multiworld, base_state, early_locations, early_prog_items, lock=True, allow_partial=True,
name="Early Progression")
unplaced_early_items = early_rest_items + early_prog_items
if unplaced_early_items:
@@ -400,18 +400,18 @@ def distribute_early_items(world: MultiWorld,
itempool += unplaced_early_items
fill_locations.extend(early_locations)
world.random.shuffle(fill_locations)
multiworld.random.shuffle(fill_locations)
return fill_locations, itempool
def distribute_items_restrictive(world: MultiWorld) -> None:
fill_locations = sorted(world.get_unfilled_locations())
world.random.shuffle(fill_locations)
def distribute_items_restrictive(multiworld: MultiWorld) -> None:
fill_locations = sorted(multiworld.get_unfilled_locations())
multiworld.random.shuffle(fill_locations)
# get items to distribute
itempool = sorted(world.itempool)
world.random.shuffle(itempool)
itempool = sorted(multiworld.itempool)
multiworld.random.shuffle(itempool)
fill_locations, itempool = distribute_early_items(world, fill_locations, itempool)
fill_locations, itempool = distribute_early_items(multiworld, fill_locations, itempool)
progitempool: typing.List[Item] = []
usefulitempool: typing.List[Item] = []
@@ -425,7 +425,7 @@ def distribute_items_restrictive(world: MultiWorld) -> None:
else:
filleritempool.append(item)
call_all(world, "fill_hook", progitempool, usefulitempool, filleritempool, fill_locations)
call_all(multiworld, "fill_hook", progitempool, usefulitempool, filleritempool, fill_locations)
locations: typing.Dict[LocationProgressType, typing.List[Location]] = {
loc_type: [] for loc_type in LocationProgressType}
@@ -446,34 +446,34 @@ def distribute_items_restrictive(world: MultiWorld) -> None:
if prioritylocations:
# "priority fill"
fill_restrictive(world, world.state, prioritylocations, progitempool, swap=False, on_place=mark_for_locking,
fill_restrictive(multiworld, multiworld.state, prioritylocations, progitempool, swap=False, on_place=mark_for_locking,
name="Priority")
accessibility_corrections(world, world.state, prioritylocations, progitempool)
accessibility_corrections(multiworld, multiworld.state, prioritylocations, progitempool)
defaultlocations = prioritylocations + defaultlocations
if progitempool:
# "advancement/progression fill"
fill_restrictive(world, world.state, defaultlocations, progitempool, name="Progression")
fill_restrictive(multiworld, multiworld.state, defaultlocations, progitempool, name="Progression")
if progitempool:
raise FillError(
f'Not enough locations for progress items. There are {len(progitempool)} more items than locations')
accessibility_corrections(world, world.state, defaultlocations)
accessibility_corrections(multiworld, multiworld.state, defaultlocations)
for location in lock_later:
if location.item:
location.locked = True
del mark_for_locking, lock_later
inaccessible_location_rules(world, world.state, defaultlocations)
inaccessible_location_rules(multiworld, multiworld.state, defaultlocations)
remaining_fill(world, excludedlocations, filleritempool)
remaining_fill(multiworld, excludedlocations, filleritempool)
if excludedlocations:
raise FillError(
f"Not enough filler items for excluded locations. There are {len(excludedlocations)} more locations than items")
restitempool = filleritempool + usefulitempool
remaining_fill(world, defaultlocations, restitempool)
remaining_fill(multiworld, defaultlocations, restitempool)
unplaced = restitempool
unfilled = defaultlocations
@@ -481,40 +481,40 @@ def distribute_items_restrictive(world: MultiWorld) -> None:
if unplaced or unfilled:
logging.warning(
f'Unplaced items({len(unplaced)}): {unplaced} - Unfilled Locations({len(unfilled)}): {unfilled}')
items_counter = Counter(location.item.player for location in world.get_locations() if location.item)
locations_counter = Counter(location.player for location in world.get_locations())
items_counter = Counter(location.item.player for location in multiworld.get_locations() if location.item)
locations_counter = Counter(location.player for location in multiworld.get_locations())
items_counter.update(item.player for item in unplaced)
locations_counter.update(location.player for location in unfilled)
print_data = {"items": items_counter, "locations": locations_counter}
logging.info(f'Per-Player counts: {print_data})')
def flood_items(world: MultiWorld) -> None:
def flood_items(multiworld: MultiWorld) -> None:
# get items to distribute
world.random.shuffle(world.itempool)
itempool = world.itempool
multiworld.random.shuffle(multiworld.itempool)
itempool = multiworld.itempool
progress_done = False
# sweep once to pick up preplaced items
world.state.sweep_for_events()
multiworld.state.sweep_for_events()
# fill world from top of itempool while we can
# fill multiworld from top of itempool while we can
while not progress_done:
location_list = world.get_unfilled_locations()
world.random.shuffle(location_list)
location_list = multiworld.get_unfilled_locations()
multiworld.random.shuffle(location_list)
spot_to_fill = None
for location in location_list:
if location.can_fill(world.state, itempool[0]):
if location.can_fill(multiworld.state, itempool[0]):
spot_to_fill = location
break
if spot_to_fill:
item = itempool.pop(0)
world.push_item(spot_to_fill, item, True)
multiworld.push_item(spot_to_fill, item, True)
continue
# ran out of spots, check if we need to step in and correct things
if len(world.get_reachable_locations()) == len(world.get_locations()):
if len(multiworld.get_reachable_locations()) == len(multiworld.get_locations()):
progress_done = True
continue
@@ -524,7 +524,7 @@ def flood_items(world: MultiWorld) -> None:
for item in itempool:
if item.advancement:
candidate_item_to_place = item
if world.unlocks_new_location(item):
if multiworld.unlocks_new_location(item):
item_to_place = item
break
@@ -537,15 +537,15 @@ def flood_items(world: MultiWorld) -> None:
raise FillError('No more progress items left to place.')
# find item to replace with progress item
location_list = world.get_reachable_locations()
world.random.shuffle(location_list)
location_list = multiworld.get_reachable_locations()
multiworld.random.shuffle(location_list)
for location in location_list:
if location.item is not None and not location.item.advancement:
# safe to replace
replace_item = location.item
replace_item.location = None
itempool.append(replace_item)
world.push_item(location, item_to_place, True)
multiworld.push_item(location, item_to_place, True)
itempool.remove(item_to_place)
break
@@ -755,7 +755,7 @@ def swap_location_item(location_1: Location, location_2: Location, check_locked:
location_1.event, location_2.event = location_2.event, location_1.event
def distribute_planned(world: MultiWorld) -> None:
def distribute_planned(multiworld: MultiWorld) -> None:
def warn(warning: str, force: typing.Union[bool, str]) -> None:
if force in [True, 'fail', 'failure', 'none', False, 'warn', 'warning']:
logging.warning(f'{warning}')
@@ -768,24 +768,24 @@ def distribute_planned(world: MultiWorld) -> None:
else:
warn(warning, force)
swept_state = world.state.copy()
swept_state = multiworld.state.copy()
swept_state.sweep_for_events()
reachable = frozenset(world.get_reachable_locations(swept_state))
reachable = frozenset(multiworld.get_reachable_locations(swept_state))
early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
non_early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
for loc in world.get_unfilled_locations():
for loc in multiworld.get_unfilled_locations():
if loc in reachable:
early_locations[loc.player].append(loc.name)
else: # not reachable with swept state
non_early_locations[loc.player].append(loc.name)
world_name_lookup = world.world_name_lookup
world_name_lookup = multiworld.world_name_lookup
block_value = typing.Union[typing.List[str], typing.Dict[str, typing.Any], str]
plando_blocks: typing.List[typing.Dict[str, typing.Any]] = []
player_ids = set(world.player_ids)
player_ids = set(multiworld.player_ids)
for player in player_ids:
for block in world.plando_items[player]:
for block in multiworld.plando_items[player]:
block['player'] = player
if 'force' not in block:
block['force'] = 'silent'
@@ -799,12 +799,12 @@ def distribute_planned(world: MultiWorld) -> None:
else:
target_world = block['world']
if target_world is False or world.players == 1: # target own world
if target_world is False or multiworld.players == 1: # target own world
worlds: typing.Set[int] = {player}
elif target_world is True: # target any worlds besides own
worlds = set(world.player_ids) - {player}
worlds = set(multiworld.player_ids) - {player}
elif target_world is None: # target all worlds
worlds = set(world.player_ids)
worlds = set(multiworld.player_ids)
elif type(target_world) == list: # list of target worlds
worlds = set()
for listed_world in target_world:
@@ -814,9 +814,9 @@ def distribute_planned(world: MultiWorld) -> None:
continue
worlds.add(world_name_lookup[listed_world])
elif type(target_world) == int: # target world by slot number
if target_world not in range(1, world.players + 1):
if target_world not in range(1, multiworld.players + 1):
failed(
f"Cannot place item in world {target_world} as it is not in range of (1, {world.players})",
f"Cannot place item in world {target_world} as it is not in range of (1, {multiworld.players})",
block['force'])
continue
worlds = {target_world}
@@ -844,7 +844,7 @@ def distribute_planned(world: MultiWorld) -> None:
item_list: typing.List[str] = []
for key, value in items.items():
if value is True:
value = world.itempool.count(world.worlds[player].create_item(key))
value = multiworld.itempool.count(multiworld.worlds[player].create_item(key))
item_list += [key] * value
items = item_list
if isinstance(items, str):
@@ -894,17 +894,17 @@ def distribute_planned(world: MultiWorld) -> None:
count = block['count']
failed(f"Plando count {count} greater than locations specified", block['force'])
block['count'] = len(block['locations'])
block['count']['target'] = world.random.randint(block['count']['min'], block['count']['max'])
block['count']['target'] = multiworld.random.randint(block['count']['min'], block['count']['max'])
if block['count']['target'] > 0:
plando_blocks.append(block)
# shuffle, but then sort blocks by number of locations minus number of items,
# so less-flexible blocks get priority
world.random.shuffle(plando_blocks)
multiworld.random.shuffle(plando_blocks)
plando_blocks.sort(key=lambda block: (len(block['locations']) - block['count']['target']
if len(block['locations']) > 0
else len(world.get_unfilled_locations(player)) - block['count']['target']))
else len(multiworld.get_unfilled_locations(player)) - block['count']['target']))
for placement in plando_blocks:
player = placement['player']
@@ -915,19 +915,19 @@ def distribute_planned(world: MultiWorld) -> None:
maxcount = placement['count']['target']
from_pool = placement['from_pool']
candidates = list(world.get_unfilled_locations_for_players(locations, sorted(worlds)))
world.random.shuffle(candidates)
world.random.shuffle(items)
candidates = list(multiworld.get_unfilled_locations_for_players(locations, sorted(worlds)))
multiworld.random.shuffle(candidates)
multiworld.random.shuffle(items)
count = 0
err: typing.List[str] = []
successful_pairs: typing.List[typing.Tuple[Item, Location]] = []
for item_name in items:
item = world.worlds[player].create_item(item_name)
item = multiworld.worlds[player].create_item(item_name)
for location in reversed(candidates):
if (location.address is None) == (item.code is None): # either both None or both not None
if not location.item:
if location.item_rule(item):
if location.can_fill(world.state, item, False):
if location.can_fill(multiworld.state, item, False):
successful_pairs.append((item, location))
candidates.remove(location)
count = count + 1
@@ -945,21 +945,21 @@ def distribute_planned(world: MultiWorld) -> None:
if count < placement['count']['min']:
m = placement['count']['min']
failed(
f"Plando block failed to place {m - count} of {m} item(s) for {world.player_name[player]}, error(s): {' '.join(err)}",
f"Plando block failed to place {m - count} of {m} item(s) for {multiworld.player_name[player]}, error(s): {' '.join(err)}",
placement['force'])
for (item, location) in successful_pairs:
world.push_item(location, item, collect=False)
multiworld.push_item(location, item, collect=False)
location.event = True # flag location to be checked during fill
location.locked = True
logging.debug(f"Plando placed {item} at {location}")
if from_pool:
try:
world.itempool.remove(item)
multiworld.itempool.remove(item)
except ValueError:
warn(
f"Could not remove {item} from pool for {world.player_name[player]} as it's already missing from it.",
f"Could not remove {item} from pool for {multiworld.player_name[player]} as it's already missing from it.",
placement['force'])
except Exception as e:
raise Exception(
f"Error running plando for player {player} ({world.player_name[player]})") from e
f"Error running plando for player {player} ({multiworld.player_name[player]})") from e