mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00
Core: Plando Items "Rewrite" (#3046)
This commit is contained in:
349
Fill.py
349
Fill.py
@@ -4,7 +4,7 @@ import logging
|
||||
import typing
|
||||
from collections import Counter, deque
|
||||
|
||||
from BaseClasses import CollectionState, Item, Location, LocationProgressType, MultiWorld
|
||||
from BaseClasses import CollectionState, Item, Location, LocationProgressType, MultiWorld, PlandoItemBlock
|
||||
from Options import Accessibility
|
||||
|
||||
from worlds.AutoWorld import call_all
|
||||
@@ -100,7 +100,7 @@ def fill_restrictive(multiworld: MultiWorld, base_state: CollectionState, locati
|
||||
# if minimal accessibility, only check whether location is reachable if game not beatable
|
||||
if multiworld.worlds[item_to_place.player].options.accessibility == Accessibility.option_minimal:
|
||||
perform_access_check = not multiworld.has_beaten_game(maximum_exploration_state,
|
||||
item_to_place.player) \
|
||||
item_to_place.player) \
|
||||
if single_player_placement else not has_beaten_game
|
||||
else:
|
||||
perform_access_check = True
|
||||
@@ -242,7 +242,7 @@ def remaining_fill(multiworld: MultiWorld,
|
||||
unplaced_items: typing.List[Item] = []
|
||||
placements: typing.List[Location] = []
|
||||
swapped_items: typing.Counter[typing.Tuple[int, str]] = Counter()
|
||||
total = min(len(itempool), len(locations))
|
||||
total = min(len(itempool), len(locations))
|
||||
placed = 0
|
||||
|
||||
# Optimisation: Decide whether to do full location.can_fill check (respect excluded), or only check the item rule
|
||||
@@ -343,8 +343,10 @@ def fast_fill(multiworld: MultiWorld,
|
||||
|
||||
def accessibility_corrections(multiworld: MultiWorld, state: CollectionState, locations, pool=[]):
|
||||
maximum_exploration_state = sweep_from_pool(state, pool)
|
||||
minimal_players = {player for player in multiworld.player_ids if multiworld.worlds[player].options.accessibility == "minimal"}
|
||||
unreachable_locations = [location for location in multiworld.get_locations() if location.player in minimal_players and
|
||||
minimal_players = {player for player in multiworld.player_ids if
|
||||
multiworld.worlds[player].options.accessibility == "minimal"}
|
||||
unreachable_locations = [location for location in multiworld.get_locations() if
|
||||
location.player in minimal_players and
|
||||
not location.can_reach(maximum_exploration_state)]
|
||||
for location in unreachable_locations:
|
||||
if (location.item is not None and location.item.advancement and location.address is not None and not
|
||||
@@ -365,7 +367,7 @@ def inaccessible_location_rules(multiworld: MultiWorld, state: CollectionState,
|
||||
unreachable_locations = [location for location in locations if not location.can_reach(maximum_exploration_state)]
|
||||
if unreachable_locations:
|
||||
def forbid_important_item_rule(item: Item):
|
||||
return not ((item.classification & 0b0011) and multiworld.worlds[item.player].options.accessibility != 'minimal')
|
||||
return not ((item.classification & 0b0011) and multiworld.worlds[item.player].options.accessibility != "minimal")
|
||||
|
||||
for location in unreachable_locations:
|
||||
add_item_rule(location, forbid_important_item_rule)
|
||||
@@ -677,9 +679,9 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
|
||||
if multiworld.worlds[player].options.progression_balancing > 0
|
||||
}
|
||||
if not balanceable_players:
|
||||
logging.info('Skipping multiworld progression balancing.')
|
||||
logging.info("Skipping multiworld progression balancing.")
|
||||
else:
|
||||
logging.info(f'Balancing multiworld progression for {len(balanceable_players)} Players.')
|
||||
logging.info(f"Balancing multiworld progression for {len(balanceable_players)} Players.")
|
||||
logging.debug(balanceable_players)
|
||||
state: CollectionState = CollectionState(multiworld)
|
||||
checked_locations: typing.Set[Location] = set()
|
||||
@@ -777,7 +779,7 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
|
||||
if player in threshold_percentages):
|
||||
break
|
||||
elif not balancing_sphere:
|
||||
raise RuntimeError('Not all required items reachable. Something went terribly wrong here.')
|
||||
raise RuntimeError("Not all required items reachable. Something went terribly wrong here.")
|
||||
# Gather a set of locations which we can swap items into
|
||||
unlocked_locations: typing.Dict[int, typing.Set[Location]] = collections.defaultdict(set)
|
||||
for l in unchecked_locations:
|
||||
@@ -793,8 +795,8 @@ def balance_multiworld_progression(multiworld: MultiWorld) -> None:
|
||||
testing = items_to_test.pop()
|
||||
reducing_state = state.copy()
|
||||
for location in itertools.chain((
|
||||
l for l in items_to_replace
|
||||
if l.item.player == player
|
||||
l for l in items_to_replace
|
||||
if l.item.player == player
|
||||
), items_to_test):
|
||||
reducing_state.collect(location.item, True, location)
|
||||
|
||||
@@ -867,52 +869,30 @@ def swap_location_item(location_1: Location, location_2: Location, check_locked:
|
||||
location_2.item.location = location_2
|
||||
|
||||
|
||||
def distribute_planned(multiworld: MultiWorld) -> None:
|
||||
def warn(warning: str, force: typing.Union[bool, str]) -> None:
|
||||
if force in [True, 'fail', 'failure', 'none', False, 'warn', 'warning']:
|
||||
logging.warning(f'{warning}')
|
||||
def parse_planned_blocks(multiworld: MultiWorld) -> dict[int, list[PlandoItemBlock]]:
|
||||
def warn(warning: str, force: bool | str) -> None:
|
||||
if isinstance(force, bool):
|
||||
logging.warning(f"{warning}")
|
||||
else:
|
||||
logging.debug(f'{warning}')
|
||||
logging.debug(f"{warning}")
|
||||
|
||||
def failed(warning: str, force: typing.Union[bool, str]) -> None:
|
||||
if force in [True, 'fail', 'failure']:
|
||||
def failed(warning: str, force: bool | str) -> None:
|
||||
if force is True:
|
||||
raise Exception(warning)
|
||||
else:
|
||||
warn(warning, force)
|
||||
|
||||
swept_state = multiworld.state.copy()
|
||||
swept_state.sweep_for_advancements()
|
||||
reachable = frozenset(multiworld.get_reachable_locations(swept_state))
|
||||
early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
|
||||
non_early_locations: typing.Dict[int, typing.List[str]] = collections.defaultdict(list)
|
||||
for loc in multiworld.get_unfilled_locations():
|
||||
if loc in reachable:
|
||||
early_locations[loc.player].append(loc.name)
|
||||
else: # not reachable with swept state
|
||||
non_early_locations[loc.player].append(loc.name)
|
||||
|
||||
world_name_lookup = multiworld.world_name_lookup
|
||||
|
||||
block_value = typing.Union[typing.List[str], typing.Dict[str, typing.Any], str]
|
||||
plando_blocks: typing.List[typing.Dict[str, typing.Any]] = []
|
||||
player_ids = set(multiworld.player_ids)
|
||||
plando_blocks: dict[int, list[PlandoItemBlock]] = dict()
|
||||
player_ids: set[int] = set(multiworld.player_ids)
|
||||
for player in player_ids:
|
||||
for block in multiworld.plando_items[player]:
|
||||
block['player'] = player
|
||||
if 'force' not in block:
|
||||
block['force'] = 'silent'
|
||||
if 'from_pool' not in block:
|
||||
block['from_pool'] = True
|
||||
elif not isinstance(block['from_pool'], bool):
|
||||
from_pool_type = type(block['from_pool'])
|
||||
raise Exception(f'Plando "from_pool" has to be boolean, not {from_pool_type} for player {player}.')
|
||||
if 'world' not in block:
|
||||
target_world = False
|
||||
else:
|
||||
target_world = block['world']
|
||||
|
||||
plando_blocks[player] = []
|
||||
for block in multiworld.worlds[player].options.plando_items:
|
||||
new_block: PlandoItemBlock = PlandoItemBlock(player, block.from_pool, block.force)
|
||||
target_world = block.world
|
||||
if target_world is False or multiworld.players == 1: # target own world
|
||||
worlds: typing.Set[int] = {player}
|
||||
worlds: set[int] = {player}
|
||||
elif target_world is True: # target any worlds besides own
|
||||
worlds = set(multiworld.player_ids) - {player}
|
||||
elif target_world is None: # target all worlds
|
||||
@@ -922,172 +902,197 @@ def distribute_planned(multiworld: MultiWorld) -> None:
|
||||
for listed_world in target_world:
|
||||
if listed_world not in world_name_lookup:
|
||||
failed(f"Cannot place item to {target_world}'s world as that world does not exist.",
|
||||
block['force'])
|
||||
block.force)
|
||||
continue
|
||||
worlds.add(world_name_lookup[listed_world])
|
||||
elif type(target_world) == int: # target world by slot number
|
||||
if target_world not in range(1, multiworld.players + 1):
|
||||
failed(
|
||||
f"Cannot place item in world {target_world} as it is not in range of (1, {multiworld.players})",
|
||||
block['force'])
|
||||
block.force)
|
||||
continue
|
||||
worlds = {target_world}
|
||||
else: # target world by slot name
|
||||
if target_world not in world_name_lookup:
|
||||
failed(f"Cannot place item to {target_world}'s world as that world does not exist.",
|
||||
block['force'])
|
||||
block.force)
|
||||
continue
|
||||
worlds = {world_name_lookup[target_world]}
|
||||
block['world'] = worlds
|
||||
new_block.worlds = worlds
|
||||
|
||||
items: block_value = []
|
||||
if "items" in block:
|
||||
items = block["items"]
|
||||
if 'count' not in block:
|
||||
block['count'] = False
|
||||
elif "item" in block:
|
||||
items = block["item"]
|
||||
if 'count' not in block:
|
||||
block['count'] = 1
|
||||
else:
|
||||
failed("You must specify at least one item to place items with plando.", block['force'])
|
||||
continue
|
||||
items: list[str] | dict[str, typing.Any] = block.items
|
||||
if isinstance(items, dict):
|
||||
item_list: typing.List[str] = []
|
||||
item_list: list[str] = []
|
||||
for key, value in items.items():
|
||||
if value is True:
|
||||
value = multiworld.itempool.count(multiworld.worlds[player].create_item(key))
|
||||
item_list += [key] * value
|
||||
items = item_list
|
||||
if isinstance(items, str):
|
||||
items = [items]
|
||||
block['items'] = items
|
||||
new_block.items = items
|
||||
|
||||
locations: block_value = []
|
||||
if 'location' in block:
|
||||
locations = block['location'] # just allow 'location' to keep old yamls compatible
|
||||
elif 'locations' in block:
|
||||
locations = block['locations']
|
||||
locations: list[str] = block.locations
|
||||
if isinstance(locations, str):
|
||||
locations = [locations]
|
||||
|
||||
if isinstance(locations, dict):
|
||||
location_list = []
|
||||
for key, value in locations.items():
|
||||
location_list += [key] * value
|
||||
locations = location_list
|
||||
locations_from_groups: list[str] = []
|
||||
resolved_locations: list[Location] = []
|
||||
for target_player in worlds:
|
||||
world_locations = multiworld.get_unfilled_locations(target_player)
|
||||
for group in multiworld.worlds[target_player].location_name_groups:
|
||||
if group in locations:
|
||||
locations_from_groups.extend(multiworld.worlds[target_player].location_name_groups[group])
|
||||
resolved_locations.extend(location for location in world_locations
|
||||
if location.name in [*locations, *locations_from_groups])
|
||||
new_block.locations = sorted(dict.fromkeys(locations))
|
||||
new_block.resolved_locations = sorted(set(resolved_locations))
|
||||
|
||||
count = block.count
|
||||
if not count:
|
||||
count = len(new_block.items)
|
||||
if isinstance(count, int):
|
||||
count = {"min": count, "max": count}
|
||||
if "min" not in count:
|
||||
count["min"] = 0
|
||||
if "max" not in count:
|
||||
count["max"] = len(new_block.items)
|
||||
|
||||
new_block.count = count
|
||||
plando_blocks[player].append(new_block)
|
||||
|
||||
return plando_blocks
|
||||
|
||||
|
||||
def resolve_early_locations_for_planned(multiworld: MultiWorld):
|
||||
def warn(warning: str, force: bool | str) -> None:
|
||||
if isinstance(force, bool):
|
||||
logging.warning(f"{warning}")
|
||||
else:
|
||||
logging.debug(f"{warning}")
|
||||
|
||||
def failed(warning: str, force: bool | str) -> None:
|
||||
if force is True:
|
||||
raise Exception(warning)
|
||||
else:
|
||||
warn(warning, force)
|
||||
|
||||
swept_state = multiworld.state.copy()
|
||||
swept_state.sweep_for_advancements()
|
||||
reachable = frozenset(multiworld.get_reachable_locations(swept_state))
|
||||
early_locations: dict[int, list[Location]] = collections.defaultdict(list)
|
||||
non_early_locations: dict[int, list[Location]] = collections.defaultdict(list)
|
||||
for loc in multiworld.get_unfilled_locations():
|
||||
if loc in reachable:
|
||||
early_locations[loc.player].append(loc)
|
||||
else: # not reachable with swept state
|
||||
non_early_locations[loc.player].append(loc)
|
||||
|
||||
for player in multiworld.plando_item_blocks:
|
||||
removed = []
|
||||
for block in multiworld.plando_item_blocks[player]:
|
||||
locations = block.locations
|
||||
resolved_locations = block.resolved_locations
|
||||
worlds = block.worlds
|
||||
if "early_locations" in locations:
|
||||
locations.remove("early_locations")
|
||||
for target_player in worlds:
|
||||
locations += early_locations[target_player]
|
||||
resolved_locations += early_locations[target_player]
|
||||
if "non_early_locations" in locations:
|
||||
locations.remove("non_early_locations")
|
||||
for target_player in worlds:
|
||||
locations += non_early_locations[target_player]
|
||||
resolved_locations += non_early_locations[target_player]
|
||||
|
||||
block['locations'] = list(dict.fromkeys(locations))
|
||||
if block.count["max"] > len(block.items):
|
||||
count = block.count["max"]
|
||||
failed(f"Plando count {count} greater than items specified", block.force)
|
||||
block.count["max"] = len(block.items)
|
||||
if block.count["min"] > len(block.items):
|
||||
block.count["min"] = len(block.items)
|
||||
if block.count["max"] > len(block.resolved_locations) > 0:
|
||||
count = block.count["max"]
|
||||
failed(f"Plando count {count} greater than locations specified", block.force)
|
||||
block.count["max"] = len(block.resolved_locations)
|
||||
if block.count["min"] > len(block.resolved_locations):
|
||||
block.count["min"] = len(block.resolved_locations)
|
||||
block.count["target"] = multiworld.random.randint(block.count["min"],
|
||||
block.count["max"])
|
||||
|
||||
if not block['count']:
|
||||
block['count'] = (min(len(block['items']), len(block['locations'])) if
|
||||
len(block['locations']) > 0 else len(block['items']))
|
||||
if isinstance(block['count'], int):
|
||||
block['count'] = {'min': block['count'], 'max': block['count']}
|
||||
if 'min' not in block['count']:
|
||||
block['count']['min'] = 0
|
||||
if 'max' not in block['count']:
|
||||
block['count']['max'] = (min(len(block['items']), len(block['locations'])) if
|
||||
len(block['locations']) > 0 else len(block['items']))
|
||||
if block['count']['max'] > len(block['items']):
|
||||
count = block['count']
|
||||
failed(f"Plando count {count} greater than items specified", block['force'])
|
||||
block['count'] = len(block['items'])
|
||||
if block['count']['max'] > len(block['locations']) > 0:
|
||||
count = block['count']
|
||||
failed(f"Plando count {count} greater than locations specified", block['force'])
|
||||
block['count'] = len(block['locations'])
|
||||
block['count']['target'] = multiworld.random.randint(block['count']['min'], block['count']['max'])
|
||||
if not block.count["target"]:
|
||||
removed.append(block)
|
||||
|
||||
if block['count']['target'] > 0:
|
||||
plando_blocks.append(block)
|
||||
for block in removed:
|
||||
multiworld.plando_item_blocks[player].remove(block)
|
||||
|
||||
|
||||
def distribute_planned_blocks(multiworld: MultiWorld, plando_blocks: list[PlandoItemBlock]):
|
||||
def warn(warning: str, force: bool | str) -> None:
|
||||
if isinstance(force, bool):
|
||||
logging.warning(f"{warning}")
|
||||
else:
|
||||
logging.debug(f"{warning}")
|
||||
|
||||
def failed(warning: str, force: bool | str) -> None:
|
||||
if force is True:
|
||||
raise Exception(warning)
|
||||
else:
|
||||
warn(warning, force)
|
||||
|
||||
# shuffle, but then sort blocks by number of locations minus number of items,
|
||||
# so less-flexible blocks get priority
|
||||
multiworld.random.shuffle(plando_blocks)
|
||||
plando_blocks.sort(key=lambda block: (len(block['locations']) - block['count']['target']
|
||||
if len(block['locations']) > 0
|
||||
else len(multiworld.get_unfilled_locations(player)) - block['count']['target']))
|
||||
|
||||
plando_blocks.sort(key=lambda block: (len(block.resolved_locations) - block.count["target"]
|
||||
if len(block.resolved_locations) > 0
|
||||
else len(multiworld.get_unfilled_locations(block.player)) -
|
||||
block.count["target"]))
|
||||
for placement in plando_blocks:
|
||||
player = placement['player']
|
||||
player = placement.player
|
||||
try:
|
||||
worlds = placement['world']
|
||||
locations = placement['locations']
|
||||
items = placement['items']
|
||||
maxcount = placement['count']['target']
|
||||
from_pool = placement['from_pool']
|
||||
worlds = placement.worlds
|
||||
locations = placement.resolved_locations
|
||||
items = placement.items
|
||||
maxcount = placement.count["target"]
|
||||
from_pool = placement.from_pool
|
||||
|
||||
candidates = list(multiworld.get_unfilled_locations_for_players(locations, sorted(worlds)))
|
||||
multiworld.random.shuffle(candidates)
|
||||
multiworld.random.shuffle(items)
|
||||
count = 0
|
||||
err: typing.List[str] = []
|
||||
successful_pairs: typing.List[typing.Tuple[int, Item, Location]] = []
|
||||
claimed_indices: typing.Set[typing.Optional[int]] = set()
|
||||
for item_name in items:
|
||||
index_to_delete: typing.Optional[int] = None
|
||||
if from_pool:
|
||||
try:
|
||||
# If from_pool, try to find an existing item with this name & player in the itempool and use it
|
||||
index_to_delete, item = next(
|
||||
(i, item) for i, item in enumerate(multiworld.itempool)
|
||||
if item.player == player and item.name == item_name and i not in claimed_indices
|
||||
)
|
||||
except StopIteration:
|
||||
warn(
|
||||
f"Could not remove {item_name} from pool for {multiworld.player_name[player]} as it's already missing from it.",
|
||||
placement['force'])
|
||||
item = multiworld.worlds[player].create_item(item_name)
|
||||
else:
|
||||
item = multiworld.worlds[player].create_item(item_name)
|
||||
|
||||
for location in reversed(candidates):
|
||||
if (location.address is None) == (item.code is None): # either both None or both not None
|
||||
if not location.item:
|
||||
if location.item_rule(item):
|
||||
if location.can_fill(multiworld.state, item, False):
|
||||
successful_pairs.append((index_to_delete, item, location))
|
||||
claimed_indices.add(index_to_delete)
|
||||
candidates.remove(location)
|
||||
count = count + 1
|
||||
break
|
||||
else:
|
||||
err.append(f"Can't place item at {location} due to fill condition not met.")
|
||||
else:
|
||||
err.append(f"{item_name} not allowed at {location}.")
|
||||
else:
|
||||
err.append(f"Cannot place {item_name} into already filled location {location}.")
|
||||
item_candidates = []
|
||||
if from_pool:
|
||||
instances = [item for item in multiworld.itempool if item.player == player and item.name in items]
|
||||
for item in multiworld.random.sample(items, maxcount):
|
||||
candidate = next((i for i in instances if i.name == item), None)
|
||||
if candidate is None:
|
||||
warn(f"Could not remove {item} from pool for {multiworld.player_name[player]} as "
|
||||
f"it's already missing from it", placement.force)
|
||||
candidate = multiworld.worlds[player].create_item(item)
|
||||
else:
|
||||
err.append(f"Mismatch between {item_name} and {location}, only one is an event.")
|
||||
|
||||
if count == maxcount:
|
||||
break
|
||||
if count < placement['count']['min']:
|
||||
m = placement['count']['min']
|
||||
failed(
|
||||
f"Plando block failed to place {m - count} of {m} item(s) for {multiworld.player_name[player]}, error(s): {' '.join(err)}",
|
||||
placement['force'])
|
||||
|
||||
# Sort indices in reverse so we can remove them one by one
|
||||
successful_pairs = sorted(successful_pairs, key=lambda successful_pair: successful_pair[0] or 0, reverse=True)
|
||||
|
||||
for (index, item, location) in successful_pairs:
|
||||
multiworld.push_item(location, item, collect=False)
|
||||
location.locked = True
|
||||
logging.debug(f"Plando placed {item} at {location}")
|
||||
if index is not None: # If this item is from_pool and was found in the pool, remove it.
|
||||
multiworld.itempool.pop(index)
|
||||
multiworld.itempool.remove(candidate)
|
||||
instances.remove(candidate)
|
||||
item_candidates.append(candidate)
|
||||
else:
|
||||
item_candidates = [multiworld.worlds[player].create_item(item)
|
||||
for item in multiworld.random.sample(items, maxcount)]
|
||||
if any(item.code is None for item in item_candidates) \
|
||||
and not all(item.code is None for item in item_candidates):
|
||||
failed(f"Plando block for player {player} ({multiworld.player_name[player]}) contains both "
|
||||
f"event items and non-event items. "
|
||||
f"Event items: {[item for item in item_candidates if item.code is None]}, "
|
||||
f"Non-event items: {[item for item in item_candidates if item.code is not None]}",
|
||||
placement.force)
|
||||
continue
|
||||
else:
|
||||
is_real = item_candidates[0].code is not None
|
||||
candidates = [candidate for candidate in locations if candidate.item is None
|
||||
and bool(candidate.address) == is_real]
|
||||
multiworld.random.shuffle(candidates)
|
||||
allstate = multiworld.get_all_state(False)
|
||||
mincount = placement.count["min"]
|
||||
allowed_margin = len(item_candidates) - mincount
|
||||
fill_restrictive(multiworld, allstate, candidates, item_candidates, lock=True,
|
||||
allow_partial=True, name="Plando Main Fill")
|
||||
|
||||
if len(item_candidates) > allowed_margin:
|
||||
failed(f"Could not place {len(item_candidates)} "
|
||||
f"of {mincount + allowed_margin} item(s) "
|
||||
f"for {multiworld.player_name[player]}, "
|
||||
f"remaining items: {item_candidates}",
|
||||
placement.force)
|
||||
if from_pool:
|
||||
multiworld.itempool.extend([item for item in item_candidates if item.code is not None])
|
||||
except Exception as e:
|
||||
raise Exception(
|
||||
f"Error running plando for player {player} ({multiworld.player_name[player]})") from e
|
||||
|
Reference in New Issue
Block a user