diff --git a/BaseClasses.py b/BaseClasses.py index ba078686..10d05406 100644 --- a/BaseClasses.py +++ b/BaseClasses.py @@ -7,11 +7,11 @@ import random import secrets import warnings from argparse import Namespace -from collections import Counter, deque +from collections import Counter, deque, defaultdict from collections.abc import Collection, MutableSequence from enum import IntEnum, IntFlag from typing import (AbstractSet, Any, Callable, ClassVar, Dict, Iterable, Iterator, List, Literal, Mapping, NamedTuple, - Optional, Protocol, Set, Tuple, Union, TYPE_CHECKING) + Optional, Protocol, Set, Tuple, Union, TYPE_CHECKING, Literal, overload) import dataclasses from typing_extensions import NotRequired, TypedDict @@ -585,26 +585,9 @@ class MultiWorld(): if self.has_beaten_game(state): return True - base_locations = self.get_locations() if locations is None else locations - prog_locations = {location for location in base_locations if location.item - and location.item.advancement and location not in state.locations_checked} - - while prog_locations: - sphere: Set[Location] = set() - # build up spheres of collection radius. - # Everything in each sphere is independent from each other in dependencies and only depends on lower spheres - for location in prog_locations: - if location.can_reach(state): - sphere.add(location) - - if not sphere: - # ran out of places and did not finish yet, quit - return False - - for location in sphere: - state.collect(location.item, True, location) - prog_locations -= sphere - + for _ in state.sweep_for_advancements(locations, + yield_each_sweep=True, + checked_locations=state.locations_checked): if self.has_beaten_game(state): return True @@ -889,20 +872,133 @@ class CollectionState(): "Please switch over to sweep_for_advancements.") return self.sweep_for_advancements(locations) - def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None) -> None: - if locations is None: - locations = self.multiworld.get_filled_locations() - reachable_advancements = True - # since the loop has a good chance to run more than once, only filter the advancements once - locations = {location for location in locations if location.advancement and location not in self.advancements} + def _sweep_for_advancements_impl(self, advancements_per_player: List[Tuple[int, List[Location]]], + yield_each_sweep: bool) -> Iterator[None]: + """ + The implementation for sweep_for_advancements is separated here because it returns a generator due to the use + of a yield statement. + """ + all_players = {player for player, _ in advancements_per_player} + players_to_check = all_players + # As an optimization, it is assumed that each player's world only logically depends on itself. However, worlds + # are allowed to logically depend on other worlds, so once there are no more players that should be checked + # under this assumption, an extra sweep iteration is performed that checks every player, to confirm that the + # sweep is finished. + checking_if_finished = False + while players_to_check: + next_advancements_per_player: List[Tuple[int, List[Location]]] = [] + next_players_to_check = set() - while reachable_advancements: - reachable_advancements = {location for location in locations if location.can_reach(self)} - locations -= reachable_advancements - for advancement in reachable_advancements: - self.advancements.add(advancement) - assert isinstance(advancement.item, Item), "tried to collect Event with no Item" - self.collect(advancement.item, True, advancement) + for player, locations in advancements_per_player: + if player not in players_to_check: + next_advancements_per_player.append((player, locations)) + continue + + # Accessibility of each location is checked first because a player's region accessibility cache becomes + # stale whenever one of their own items is collected into the state. + reachable_locations: List[Location] = [] + unreachable_locations: List[Location] = [] + for location in locations: + if location.can_reach(self): + # Locations containing items that do not belong to `player` could be collected immediately + # because they won't stale `player`'s region accessibility cache, but, for simplicity, all the + # items at reachable locations are collected in a single loop. + reachable_locations.append(location) + else: + unreachable_locations.append(location) + if unreachable_locations: + next_advancements_per_player.append((player, unreachable_locations)) + + # A previous player's locations processed in the current `while players_to_check` iteration could have + # collected items belonging to `player`, but now that all of `player`'s reachable locations have been + # found, it can be assumed that `player` will not gain any more reachable locations until another one of + # their items is collected. + # It would be clearer to not add players to `next_players_to_check` in the first place if they have yet + # to be processed in the current `while players_to_check` iteration, but checking if a player should be + # added to `next_players_to_check` would need to be run once for every item that is collected, so it is + # more performant to instead discard `player` from `next_players_to_check` once their locations have + # been processed. + next_players_to_check.discard(player) + + # Collect the items from the reachable locations. + for advancement in reachable_locations: + self.advancements.add(advancement) + item = advancement.item + assert isinstance(item, Item), "tried to collect advancement Location with no Item" + if self.collect(item, True, advancement): + # The player the item belongs to may be able to reach additional locations in the next sweep + # iteration. + next_players_to_check.add(item.player) + + if not next_players_to_check: + if not checking_if_finished: + # It is assumed that each player's world only logically depends on itself, which may not be the + # case, so confirm that the sweep is finished by doing an extra iteration that checks every player. + checking_if_finished = True + next_players_to_check = all_players + else: + checking_if_finished = False + + players_to_check = next_players_to_check + advancements_per_player = next_advancements_per_player + + if yield_each_sweep: + yield + + @overload + def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None, *, + yield_each_sweep: Literal[True], + checked_locations: Optional[Set[Location]] = None) -> Iterator[None]: ... + + @overload + def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None, + yield_each_sweep: Literal[False] = False, + checked_locations: Optional[Set[Location]] = None) -> None: ... + + def sweep_for_advancements(self, locations: Optional[Iterable[Location]] = None, yield_each_sweep: bool = False, + checked_locations: Optional[Set[Location]] = None) -> Optional[Iterator[None]]: + """ + Sweep through the locations that contain uncollected advancement items, collecting the items into the state + until there are no more reachable locations that contain uncollected advancement items. + + :param locations: The locations to sweep through, defaulting to all locations in the multiworld. + :param yield_each_sweep: When True, return a generator that yields at the end of each sweep iteration. + :param checked_locations: Optional override of locations to filter out from the locations argument, defaults to + self.advancements when None. + """ + if checked_locations is None: + checked_locations = self.advancements + + # Since the sweep loop usually performs many iterations, the locations are filtered in advance. + # A list of tuples is used, instead of a dictionary, because it is faster to iterate. + advancements_per_player: List[Tuple[int, List[Location]]] + if locations is None: + # `location.advancement` can only be True for filled locations, so unfilled locations are filtered out. + advancements_per_player = [] + for player, locations_dict in self.multiworld.regions.location_cache.items(): + filtered_locations = [location for location in locations_dict.values() + if location.advancement and location not in checked_locations] + if filtered_locations: + advancements_per_player.append((player, filtered_locations)) + else: + # Filter and separate the locations into a list for each player. + advancements_per_player_dict: Dict[int, List[Location]] = defaultdict(list) + for location in locations: + if location.advancement and location not in checked_locations: + advancements_per_player_dict[location.player].append(location) + # Convert to a list of tuples. + advancements_per_player = list(advancements_per_player_dict.items()) + del advancements_per_player_dict + + if yield_each_sweep: + # Return a generator that will yield at the end of each sweep iteration. + return self._sweep_for_advancements_impl(advancements_per_player, True) + else: + # Create the generator, but tell it not to yield anything, so it will run to completion in zero iterations + # once started, then start and exhaust the generator by attempting to iterate it. + for _ in self._sweep_for_advancements_impl(advancements_per_player, False): + assert False, "Generator yielded when it should have run to completion without yielding" + return None # item name related def has(self, item: str, player: int, count: int = 1) -> bool: