mirror of
				https://github.com/MarioSpore/Grinch-AP.git
				synced 2025-10-21 20:21:32 -06:00 
			
		
		
		
	 96d544ac84
			
		
	
	96d544ac84
	
	
	
		
			
			In some cases multi-thousand element lists were created in-memory with near identical contents, per player, then discarded and reassembled. Was testing against a case with 3 GB of additional memory use (50 players) which appeared to get stuck, but really was just very slow. This example case is fixed with these changes. Additionally, progression balancing is now run after ShopSlotFill, so it is now "aware" of the changed progression shops can produce. As well, special handling for keys was removed, as not all games will have the notion of keys.
		
			
				
	
	
		
			426 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			426 lines
		
	
	
		
			21 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import typing
 | |
| import collections
 | |
| import itertools
 | |
| 
 | |
| from BaseClasses import CollectionState, PlandoItem, Location
 | |
| from Items import ItemFactory
 | |
| from Regions import key_drop_data
 | |
| 
 | |
| 
 | |
| class FillError(RuntimeError):
 | |
|     pass
 | |
| 
 | |
| 
 | |
| def fill_restrictive(world, base_state: CollectionState, locations, itempool, single_player_placement=False,
 | |
|                      lock=False):
 | |
|     def sweep_from_pool():
 | |
|         new_state = base_state.copy()
 | |
|         for item in itempool:
 | |
|             new_state.collect(item, True)
 | |
|         new_state.sweep_for_events()
 | |
|         return new_state
 | |
| 
 | |
|     unplaced_items = []
 | |
|     placements = []
 | |
| 
 | |
|     no_access_checks = {}
 | |
|     reachable_items = {}
 | |
|     for item in itempool:
 | |
|         if world.accessibility[item.player] == 'none':
 | |
|             no_access_checks.setdefault(item.player, []).append(item)
 | |
|         else:
 | |
|             reachable_items.setdefault(item.player, []).append(item)
 | |
| 
 | |
|     for player_items in [no_access_checks, reachable_items]:
 | |
|         while any(player_items.values()) and locations:
 | |
|             items_to_place = [[itempool.remove(items[-1]), items.pop()][-1] for items in player_items.values() if items]
 | |
| 
 | |
|             maximum_exploration_state = sweep_from_pool()
 | |
|             has_beaten_game = world.has_beaten_game(maximum_exploration_state)
 | |
| 
 | |
|             for item_to_place in items_to_place:
 | |
|                 perform_access_check = True
 | |
|                 if world.accessibility[item_to_place.player] == 'none':
 | |
|                     perform_access_check = not world.has_beaten_game(maximum_exploration_state, item_to_place.player) if single_player_placement else not has_beaten_game
 | |
|                 for location in locations:
 | |
|                     if (not single_player_placement or location.player == item_to_place.player)\
 | |
|                             and location.can_fill(maximum_exploration_state, item_to_place, perform_access_check):
 | |
|                         spot_to_fill = location
 | |
|                         break
 | |
| 
 | |
|                 else:
 | |
|                     # we filled all reachable spots. Maybe the game can be beaten anyway?
 | |
|                     unplaced_items.insert(0, item_to_place)
 | |
|                     if world.accessibility[item_to_place.player] != 'none' and world.can_beat_game():
 | |
|                         logging.warning(
 | |
|                             f'Not all items placed. Game beatable anyway. (Could not place {item_to_place})')
 | |
|                         continue
 | |
|                     # fill in name of world for item
 | |
|                     item_to_place.world = world
 | |
|                     raise FillError(f'No more spots to place {item_to_place}, locations {locations} are invalid. '
 | |
|                                     f'Already placed {len(placements)}: {", ".join(str(place) for place in placements)}')
 | |
| 
 | |
|                 world.push_item(spot_to_fill, item_to_place, False)
 | |
|                 if lock:
 | |
|                     spot_to_fill.locked = True
 | |
|                 locations.remove(spot_to_fill)
 | |
|                 placements.append(spot_to_fill)
 | |
|                 spot_to_fill.event = True
 | |
| 
 | |
|     itempool.extend(unplaced_items)
 | |
| 
 | |
| def distribute_items_restrictive(world, gftower_trash=False, fill_locations=None):
 | |
|     # If not passed in, then get a shuffled list of locations to fill in
 | |
|     if not fill_locations:
 | |
|         fill_locations = world.get_unfilled_locations()
 | |
|         world.random.shuffle(fill_locations)
 | |
| 
 | |
|     # get items to distribute
 | |
|     world.random.shuffle(world.itempool)
 | |
|     progitempool = []
 | |
|     localrestitempool = {player: [] for player in range(1, world.players + 1)}
 | |
|     restitempool = []
 | |
| 
 | |
|     for item in world.itempool:
 | |
|         if item.advancement:
 | |
|             progitempool.append(item)
 | |
|         elif item.name in world.local_items[item.player]:
 | |
|             localrestitempool[item.player].append(item)
 | |
|         else:
 | |
|             restitempool.append(item)
 | |
| 
 | |
|     # fill in gtower locations with trash first
 | |
|     for player in range(1, world.players + 1):
 | |
|         if not gftower_trash or not world.ganonstower_vanilla[player] or \
 | |
|                 world.logic[player] in {'owglitches', "nologic"}:
 | |
|             gtower_trash_count = 0
 | |
|         elif 'triforcehunt' in world.goal[player] and ('local' in world.goal[player] or world.players == 1):
 | |
|             gtower_trash_count = world.random.randint(world.crystals_needed_for_gt[player] * 2,
 | |
|                                                       world.crystals_needed_for_gt[player] * 4)
 | |
|         else:
 | |
|             gtower_trash_count = world.random.randint(0, world.crystals_needed_for_gt[player] * 2)
 | |
| 
 | |
|         if gtower_trash_count:
 | |
|             gtower_locations = [location for location in fill_locations if
 | |
|                                 'Ganons Tower' in location.name and location.player == player]
 | |
|             world.random.shuffle(gtower_locations)
 | |
|             trashcnt = 0
 | |
|             localrest = localrestitempool[player]
 | |
|             if localrest:
 | |
|                 gt_item_pool = restitempool + localrest
 | |
|                 world.random.shuffle(gt_item_pool)
 | |
|             else:
 | |
|                 gt_item_pool = restitempool.copy()
 | |
| 
 | |
|             while gtower_locations and gt_item_pool and trashcnt < gtower_trash_count:
 | |
|                 spot_to_fill = gtower_locations.pop()
 | |
|                 item_to_place = gt_item_pool.pop()
 | |
|                 if item_to_place in localrest:
 | |
|                     localrest.remove(item_to_place)
 | |
|                 else:
 | |
|                     restitempool.remove(item_to_place)
 | |
|                 world.push_item(spot_to_fill, item_to_place, False)
 | |
|                 fill_locations.remove(spot_to_fill)
 | |
|                 trashcnt += 1
 | |
| 
 | |
|     world.random.shuffle(fill_locations)
 | |
| 
 | |
|     # Make sure the escape small key is placed first in standard with key shuffle to prevent running out of spots
 | |
|     standard_keyshuffle_players = {player for player, mode in world.mode.items() if mode == 'standard' and
 | |
|                                    world.keyshuffle[player] is True}
 | |
|     if standard_keyshuffle_players:
 | |
|         progitempool.sort(
 | |
|             key=lambda item: 1 if item.name == 'Small Key (Hyrule Castle)' and
 | |
|                                   item.player in standard_keyshuffle_players else 0)
 | |
| 
 | |
|     fill_restrictive(world, world.state, fill_locations, progitempool)
 | |
| 
 | |
|     if any(localrestitempool.values()):  # we need to make sure some fills are limited to certain worlds
 | |
|         local_locations = {player: [] for player in world.player_ids}
 | |
|         for location in fill_locations:
 | |
|             local_locations[location.player].append(location)
 | |
|         for locations in local_locations.values():
 | |
|             world.random.shuffle(locations)
 | |
| 
 | |
|         for player, items in localrestitempool.items():  # items already shuffled
 | |
|             player_local_locations = local_locations[player]
 | |
|             for item_to_place in items:
 | |
|                 if not player_local_locations:
 | |
|                     logging.warning(f"Ran out of local locations for player {player}, "
 | |
|                                     f"cannot place {item_to_place}.")
 | |
|                     break
 | |
|                 spot_to_fill = player_local_locations.pop()
 | |
|                 world.push_item(spot_to_fill, item_to_place, False)
 | |
|                 fill_locations.remove(spot_to_fill)
 | |
| 
 | |
|     world.random.shuffle(fill_locations)
 | |
| 
 | |
|     restitempool, fill_locations = fast_fill(world, restitempool, fill_locations)
 | |
|     unplaced = [item for item in progitempool + restitempool]
 | |
|     unfilled = [location.name for location in fill_locations]
 | |
| 
 | |
|     for location in fill_locations:
 | |
|         world.push_item(location, ItemFactory('Nothing', location.player), False)
 | |
| 
 | |
|     if unplaced or unfilled:
 | |
|         logging.warning('Unplaced items: %s - Unfilled Locations: %s', unplaced, unfilled)
 | |
| 
 | |
| 
 | |
| def fast_fill(world, item_pool: typing.List, fill_locations: typing.List) -> typing.Tuple[typing.List, typing.List]:
 | |
|     placing = min(len(item_pool), len(fill_locations))
 | |
|     for item, location in zip(item_pool, fill_locations):
 | |
|         world.push_item(location, item, False)
 | |
|     return item_pool[placing:], fill_locations[placing:]
 | |
| 
 | |
| 
 | |
| def flood_items(world):
 | |
|     # get items to distribute
 | |
|     world.random.shuffle(world.itempool)
 | |
|     itempool = world.itempool
 | |
|     progress_done = False
 | |
| 
 | |
|     # sweep once to pick up preplaced items
 | |
|     world.state.sweep_for_events()
 | |
| 
 | |
|     # fill world from top of itempool while we can
 | |
|     while not progress_done:
 | |
|         location_list = world.get_unfilled_locations()
 | |
|         world.random.shuffle(location_list)
 | |
|         spot_to_fill = None
 | |
|         for location in location_list:
 | |
|             if location.can_fill(world.state, itempool[0]):
 | |
|                 spot_to_fill = location
 | |
|                 break
 | |
| 
 | |
|         if spot_to_fill:
 | |
|             item = itempool.pop(0)
 | |
|             world.push_item(spot_to_fill, item, True)
 | |
|             continue
 | |
| 
 | |
|         # ran out of spots, check if we need to step in and correct things
 | |
|         if len(world.get_reachable_locations()) == len(world.get_locations()):
 | |
|             progress_done = True
 | |
|             continue
 | |
| 
 | |
|         # need to place a progress item instead of an already placed item, find candidate
 | |
|         item_to_place = None
 | |
|         candidate_item_to_place = None
 | |
|         for item in itempool:
 | |
|             if item.advancement:
 | |
|                 candidate_item_to_place = item
 | |
|                 if world.unlocks_new_location(item):
 | |
|                     item_to_place = item
 | |
|                     break
 | |
| 
 | |
|         # we might be in a situation where all new locations require multiple items to reach. If that is the case, just place any advancement item we've found and continue trying
 | |
|         if item_to_place is None:
 | |
|             if candidate_item_to_place is not None:
 | |
|                 item_to_place = candidate_item_to_place
 | |
|             else:
 | |
|                 raise FillError('No more progress items left to place.')
 | |
| 
 | |
|         # find item to replace with progress item
 | |
|         location_list = world.get_reachable_locations()
 | |
|         world.random.shuffle(location_list)
 | |
|         for location in location_list:
 | |
|             if location.item is not None and not location.item.advancement and not location.item.smallkey and not location.item.bigkey:
 | |
|                 # safe to replace
 | |
|                 replace_item = location.item
 | |
|                 replace_item.location = None
 | |
|                 itempool.append(replace_item)
 | |
|                 world.push_item(location, item_to_place, True)
 | |
|                 itempool.remove(item_to_place)
 | |
|                 break
 | |
| 
 | |
| 
 | |
| def balance_multiworld_progression(world):
 | |
|     balanceable_players = {player for player in range(1, world.players + 1) if world.progression_balancing[player]}
 | |
|     if not balanceable_players:
 | |
|         logging.info('Skipping multiworld progression balancing.')
 | |
|     else:
 | |
|         logging.info(f'Balancing multiworld progression for {len(balanceable_players)} Players.')
 | |
|         state = CollectionState(world)
 | |
|         checked_locations = []
 | |
|         unchecked_locations = world.get_locations().copy()
 | |
|         world.random.shuffle(unchecked_locations)
 | |
| 
 | |
|         reachable_locations_count = {player: 0 for player in world.player_ids}
 | |
| 
 | |
|         def get_sphere_locations(sphere_state, locations):
 | |
|             sphere_state.sweep_for_events(key_only=True, locations=locations)
 | |
|             return [loc for loc in locations if sphere_state.can_reach(loc)]
 | |
| 
 | |
|         while True:
 | |
|             sphere_locations = get_sphere_locations(state, unchecked_locations)
 | |
|             for location in sphere_locations:
 | |
|                 unchecked_locations.remove(location)
 | |
|                 reachable_locations_count[location.player] += 1
 | |
| 
 | |
|             if checked_locations:
 | |
|                 threshold = max(reachable_locations_count.values()) - 20
 | |
|                 balancing_players = [player for player, reachables in reachable_locations_count.items() if
 | |
|                                      reachables < threshold and player in balanceable_players]
 | |
|                 if balancing_players:
 | |
|                     balancing_state = state.copy()
 | |
|                     balancing_unchecked_locations = unchecked_locations.copy()
 | |
|                     balancing_reachables = reachable_locations_count.copy()
 | |
|                     balancing_sphere = sphere_locations.copy()
 | |
|                     candidate_items = collections.defaultdict(list)
 | |
|                     while True:
 | |
|                         for location in balancing_sphere:
 | |
|                             if location.event:
 | |
|                                 balancing_state.collect(location.item, True, location)
 | |
|                                 player = location.item.player
 | |
|                                 # only replace items that end up in another player's world
 | |
|                                 if not location.locked and player in balancing_players and location.player != player:
 | |
|                                     candidate_items[player].append(location)
 | |
|                         balancing_sphere = get_sphere_locations(balancing_state, balancing_unchecked_locations)
 | |
|                         for location in balancing_sphere:
 | |
|                             balancing_unchecked_locations.remove(location)
 | |
|                             balancing_reachables[location.player] += 1
 | |
|                         if world.has_beaten_game(balancing_state) or all(
 | |
|                                 reachables >= threshold for reachables in balancing_reachables.values()):
 | |
|                             break
 | |
|                         elif not balancing_sphere:
 | |
|                             raise RuntimeError('Not all required items reachable. Something went terribly wrong here.')
 | |
|                     unlocked_locations = collections.defaultdict(list)
 | |
|                     for l in unchecked_locations:
 | |
|                         if l not in balancing_unchecked_locations:
 | |
|                             unlocked_locations[l.player].append(l)
 | |
|                     items_to_replace = []
 | |
|                     for player in balancing_players:
 | |
|                         locations_to_test = unlocked_locations[player]
 | |
|                         items_to_test = candidate_items[player]
 | |
|                         while items_to_test:
 | |
|                             testing = items_to_test.pop()
 | |
|                             reducing_state = state.copy()
 | |
|                             for location in itertools.chain((l for l in items_to_replace if l.item.player == player),
 | |
|                                                             items_to_test):
 | |
| 
 | |
|                                 reducing_state.collect(location.item, True, location)
 | |
| 
 | |
|                             reducing_state.sweep_for_events(locations=locations_to_test)
 | |
| 
 | |
|                             if world.has_beaten_game(balancing_state):
 | |
|                                 if not world.has_beaten_game(reducing_state):
 | |
|                                     items_to_replace.append(testing)
 | |
|                             else:
 | |
|                                 reduced_sphere = get_sphere_locations(reducing_state, locations_to_test)
 | |
|                                 if reachable_locations_count[player] + len(reduced_sphere) < threshold:
 | |
|                                     items_to_replace.append(testing)
 | |
| 
 | |
|                     replaced_items = False
 | |
|                     replacement_locations = [l for l in checked_locations if not l.event and not l.locked]
 | |
|                     while replacement_locations and items_to_replace:
 | |
|                         new_location = replacement_locations.pop()
 | |
|                         old_location = items_to_replace.pop()
 | |
| 
 | |
|                         while not new_location.can_fill(state, old_location.item, False) or (
 | |
|                                 new_location.item and not old_location.can_fill(state, new_location.item, False)):
 | |
|                             replacement_locations.insert(0, new_location)
 | |
|                             new_location = replacement_locations.pop()
 | |
| 
 | |
|                         swap_location_item(old_location, new_location)
 | |
|                         logging.debug(f"Progression balancing moved {new_location.item} to {new_location}, "
 | |
|                                       f"displacing {old_location.item} into {old_location}")
 | |
|                         state.collect(new_location.item, True, new_location)
 | |
|                         replaced_items = True
 | |
| 
 | |
|                     if replaced_items:
 | |
|                         unlocked = [fresh for player in balancing_players for fresh in unlocked_locations[player]]
 | |
|                         for location in get_sphere_locations(state, unlocked):
 | |
|                             unchecked_locations.remove(location)
 | |
|                             reachable_locations_count[location.player] += 1
 | |
|                             sphere_locations.append(location)
 | |
| 
 | |
|             for location in sphere_locations:
 | |
|                 if location.event:
 | |
|                     state.collect(location.item, True, location)
 | |
|             checked_locations.extend(sphere_locations)
 | |
| 
 | |
|             if world.has_beaten_game(state):
 | |
|                 break
 | |
|             elif not sphere_locations:
 | |
|                 raise RuntimeError('Not all required items reachable. Something went terribly wrong here.')
 | |
| 
 | |
| 
 | |
| def swap_location_item(location_1: Location, location_2: Location, check_locked=True):
 | |
|     """Swaps Items of locations. Does NOT swap flags like shop_slot or locked, but does swap event"""
 | |
|     if check_locked:
 | |
|         if location_1.locked:
 | |
|             logging.warning(f"Swapping {location_1}, which is marked as locked.")
 | |
|         if location_2.locked:
 | |
|             logging.warning(f"Swapping {location_2}, which is marked as locked.")
 | |
|     location_2.item, location_1.item = location_1.item, location_2.item
 | |
|     location_1.item.location = location_1
 | |
|     location_2.item.location = location_2
 | |
|     location_1.event, location_2.event = location_2.event, location_1.event
 | |
| 
 | |
| 
 | |
| def distribute_planned(world):
 | |
|     world_name_lookup = {world.player_names[player_id][0]: player_id for player_id in world.player_ids}
 | |
| 
 | |
|     for player in world.player_ids:
 | |
|         placement: PlandoItem
 | |
|         for placement in world.plando_items[player]:
 | |
|             if placement.location in key_drop_data:
 | |
|                 placement.warn(
 | |
|                     f"Can't place '{placement.item}' at '{placement.location}', as key drop shuffle locations are not supported yet.")
 | |
|                 continue
 | |
|             item = ItemFactory(placement.item, player)
 | |
|             target_world: int = placement.world
 | |
|             if target_world is False or world.players == 1:
 | |
|                 target_world = player  # in own world
 | |
|             elif target_world is True:  # in any other world
 | |
|                 unfilled = list(location for location in world.get_unfilled_locations_for_players(
 | |
|                     placement.location,
 | |
|                     set(world.player_ids) - {player}) if location.item_rule(item)
 | |
|                                 )
 | |
|                 if not unfilled:
 | |
|                     placement.failed(f"Could not find a world with an unfilled location {placement.location}", FillError)
 | |
|                     continue
 | |
| 
 | |
|                 target_world = world.random.choice(unfilled).player
 | |
| 
 | |
|             elif target_world is None:  # any random world
 | |
|                 unfilled = list(location for location in world.get_unfilled_locations_for_players(
 | |
|                     placement.location,
 | |
|                     set(world.player_ids)) if location.item_rule(item)
 | |
|                                 )
 | |
|                 if not unfilled:
 | |
|                     placement.failed(f"Could not find a world with an unfilled location {placement.location}", FillError)
 | |
|                     continue
 | |
| 
 | |
|                 target_world = world.random.choice(unfilled).player
 | |
| 
 | |
|             elif type(target_world) == int:  # target world by player id
 | |
|                 if target_world not in range(1, world.players + 1):
 | |
|                     placement.failed(f"Cannot place item in world {target_world} as it is not in range of (1, {world.players})", ValueError)
 | |
|                     continue
 | |
|             else:  # find world by name
 | |
|                 if target_world not in world_name_lookup:
 | |
|                     placement.failed(f"Cannot place item to {target_world}'s world as that world does not exist.", ValueError)
 | |
|                     continue
 | |
|                 target_world = world_name_lookup[target_world]
 | |
| 
 | |
|             location = world.get_location(placement.location, target_world)
 | |
|             if location.item:
 | |
|                 placement.failed(f"Cannot place item into already filled location {location}.")
 | |
|                 continue
 | |
| 
 | |
|             if location.can_fill(world.state, item, False):
 | |
|                 world.push_item(location, item, collect=False)
 | |
|                 location.event = True  # flag location to be checked during fill
 | |
|                 location.locked = True
 | |
|                 logging.debug(f"Plando placed {item} at {location}")
 | |
|             else:
 | |
|                 placement.failed(f"Can't place {item} at {location} due to fill condition not met.")
 | |
|                 continue
 | |
| 
 | |
|             if placement.from_pool:  # Should happen AFTER the item is placed, in case it was allowed to skip failed placement.
 | |
|                 try:
 | |
|                     world.itempool.remove(item)
 | |
|                 except ValueError:
 | |
|                     placement.warn(f"Could not remove {item} from pool as it's already missing from it.")
 |