258 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			258 lines
		
	
	
		
			9.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import logging
 | |
| import os
 | |
| import threading
 | |
| import unittest
 | |
| from contextlib import contextmanager
 | |
| from typing import Dict, ClassVar, Iterable, Tuple, Optional, List, Union, Any
 | |
| 
 | |
| from BaseClasses import MultiWorld, CollectionState, get_seed, Location, Item
 | |
| from test.bases import WorldTestBase
 | |
| from test.general import gen_steps, setup_solo_multiworld as setup_base_solo_multiworld
 | |
| from worlds.AutoWorld import call_all
 | |
| from .assertion import RuleAssertMixin
 | |
| from .options.utils import fill_namespace_with_default, parse_class_option_keys, fill_dataclass_with_default
 | |
| from .. import StardewValleyWorld, StardewItem
 | |
| from ..options import StardewValleyOption
 | |
| 
 | |
| logger = logging.getLogger(__name__)
 | |
| 
 | |
| DEFAULT_TEST_SEED = get_seed()
 | |
| logger.info(f"Default Test Seed: {DEFAULT_TEST_SEED}")
 | |
| 
 | |
| 
 | |
| class SVTestCase(unittest.TestCase):
 | |
|     # Set False to not skip some 'extra' tests
 | |
|     skip_base_tests: bool = True
 | |
|     # Set False to run tests that take long
 | |
|     skip_long_tests: bool = True
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls) -> None:
 | |
|         super().setUpClass()
 | |
|         base_tests_key = "base"
 | |
|         if base_tests_key in os.environ:
 | |
|             cls.skip_base_tests = not bool(os.environ[base_tests_key])
 | |
|         long_tests_key = "long"
 | |
|         if long_tests_key in os.environ:
 | |
|             cls.skip_long_tests = not bool(os.environ[long_tests_key])
 | |
| 
 | |
|     @contextmanager
 | |
|     def solo_world_sub_test(self, msg: Optional[str] = None,
 | |
|                             /,
 | |
|                             world_options: Optional[Dict[Union[str, StardewValleyOption], Any]] = None,
 | |
|                             *,
 | |
|                             seed=DEFAULT_TEST_SEED,
 | |
|                             world_caching=True,
 | |
|                             **kwargs) -> Tuple[MultiWorld, StardewValleyWorld]:
 | |
|         if msg is not None:
 | |
|             msg += " "
 | |
|         else:
 | |
|             msg = ""
 | |
|         msg += f"[Seed = {seed}]"
 | |
| 
 | |
|         with self.subTest(msg, **kwargs):
 | |
|             with solo_multiworld(world_options, seed=seed, world_caching=world_caching) as (multiworld, world):
 | |
|                 yield multiworld, world
 | |
| 
 | |
| 
 | |
| class SVTestBase(RuleAssertMixin, WorldTestBase, SVTestCase):
 | |
|     game = "Stardew Valley"
 | |
|     world: StardewValleyWorld
 | |
|     player: ClassVar[int] = 1
 | |
| 
 | |
|     seed = DEFAULT_TEST_SEED
 | |
| 
 | |
|     @classmethod
 | |
|     def setUpClass(cls) -> None:
 | |
|         if cls is SVTestBase:
 | |
|             raise unittest.SkipTest("No running tests on SVTestBase import.")
 | |
| 
 | |
|         super().setUpClass()
 | |
| 
 | |
|     def world_setup(self, *args, **kwargs):
 | |
|         self.options = parse_class_option_keys(self.options)
 | |
| 
 | |
|         self.multiworld = setup_solo_multiworld(self.options, seed=self.seed)
 | |
|         self.multiworld.lock.acquire()
 | |
|         world = self.multiworld.worlds[self.player]
 | |
| 
 | |
|         self.original_state = self.multiworld.state.copy()
 | |
|         self.original_itempool = self.multiworld.itempool.copy()
 | |
|         self.unfilled_locations = self.multiworld.get_unfilled_locations(1)
 | |
|         if self.constructed:
 | |
|             self.world = world  # noqa
 | |
| 
 | |
|     def tearDown(self) -> None:
 | |
|         self.multiworld.state = self.original_state
 | |
|         self.multiworld.itempool = self.original_itempool
 | |
|         for location in self.unfilled_locations:
 | |
|             location.item = None
 | |
| 
 | |
|         self.multiworld.lock.release()
 | |
| 
 | |
|     @property
 | |
|     def run_default_tests(self) -> bool:
 | |
|         if self.skip_base_tests:
 | |
|             return False
 | |
|         return super().run_default_tests
 | |
| 
 | |
|     def collect_lots_of_money(self, percent: float = 0.25):
 | |
|         self.collect("Shipping Bin")
 | |
|         real_total_prog_items = self.world.total_progression_items
 | |
|         required_prog_items = int(round(real_total_prog_items * percent))
 | |
|         self.collect("Stardrop", required_prog_items)
 | |
| 
 | |
|     def collect_all_the_money(self):
 | |
|         self.collect_lots_of_money(0.95)
 | |
| 
 | |
|     def collect_everything(self):
 | |
|         non_event_items = [item for item in self.multiworld.get_items() if item.code]
 | |
|         for item in non_event_items:
 | |
|             self.multiworld.state.collect(item)
 | |
| 
 | |
|     def collect_all_except(self, item_to_not_collect: str):
 | |
|         non_event_items = [item for item in self.multiworld.get_items() if item.code]
 | |
|         for item in non_event_items:
 | |
|             if item.name != item_to_not_collect:
 | |
|                 self.multiworld.state.collect(item)
 | |
| 
 | |
|     def get_real_locations(self) -> List[Location]:
 | |
|         return [location for location in self.multiworld.get_locations(self.player) if location.address is not None]
 | |
| 
 | |
|     def get_real_location_names(self) -> List[str]:
 | |
|         return [location.name for location in self.get_real_locations()]
 | |
| 
 | |
|     def collect(self, item: Union[str, Item, Iterable[Item]], count: int = 1) -> Union[None, Item, List[Item]]:
 | |
|         assert count > 0
 | |
| 
 | |
|         if not isinstance(item, str):
 | |
|             super().collect(item)
 | |
|             return
 | |
| 
 | |
|         if count == 1:
 | |
|             item = self.create_item(item)
 | |
|             self.multiworld.state.collect(item)
 | |
|             return item
 | |
| 
 | |
|         items = []
 | |
|         for i in range(count):
 | |
|             item = self.create_item(item)
 | |
|             self.multiworld.state.collect(item)
 | |
|             items.append(item)
 | |
| 
 | |
|         return items
 | |
| 
 | |
|     def create_item(self, item: str) -> StardewItem:
 | |
|         return self.world.create_item(item)
 | |
| 
 | |
|     def remove_one_by_name(self, item: str) -> None:
 | |
|         self.remove(self.create_item(item))
 | |
| 
 | |
|     def reset_collection_state(self):
 | |
|         self.multiworld.state = self.original_state.copy()
 | |
| 
 | |
| 
 | |
| pre_generated_worlds = {}
 | |
| 
 | |
| 
 | |
| @contextmanager
 | |
| def solo_multiworld(world_options: Optional[Dict[Union[str, StardewValleyOption], Any]] = None,
 | |
|                     *,
 | |
|                     seed=DEFAULT_TEST_SEED,
 | |
|                     world_caching=True) -> Tuple[MultiWorld, StardewValleyWorld]:
 | |
|     if not world_caching:
 | |
|         multiworld = setup_solo_multiworld(world_options, seed, _cache={})
 | |
|         yield multiworld, multiworld.worlds[1]
 | |
|     else:
 | |
|         multiworld = setup_solo_multiworld(world_options, seed)
 | |
|         multiworld.lock.acquire()
 | |
|         world = multiworld.worlds[1]
 | |
| 
 | |
|         original_state = multiworld.state.copy()
 | |
|         original_itempool = multiworld.itempool.copy()
 | |
|         unfilled_locations = multiworld.get_unfilled_locations(1)
 | |
| 
 | |
|         yield multiworld, world
 | |
| 
 | |
|         multiworld.state = original_state
 | |
|         multiworld.itempool = original_itempool
 | |
|         for location in unfilled_locations:
 | |
|             location.item = None
 | |
| 
 | |
|         multiworld.lock.release()
 | |
| 
 | |
| 
 | |
| # Mostly a copy of test.general.setup_solo_multiworld, I just don't want to change the core.
 | |
| def setup_solo_multiworld(test_options: Optional[Dict[Union[str, StardewValleyOption], str]] = None,
 | |
|                           seed=DEFAULT_TEST_SEED,
 | |
|                           _cache: Dict[frozenset, MultiWorld] = {},  # noqa
 | |
|                           _steps=gen_steps) -> MultiWorld:
 | |
|     test_options = parse_class_option_keys(test_options)
 | |
| 
 | |
|     # Yes I reuse the worlds generated between tests, its speeds the execution by a couple seconds
 | |
|     # If the simple dict caching ends up taking too much memory, we could replace it with some kind of lru cache. 
 | |
|     should_cache = "start_inventory" not in test_options
 | |
|     if should_cache:
 | |
|         frozen_options = frozenset(test_options.items()).union({("seed", seed)})
 | |
|         cached_multi_world = search_world_cache(_cache, frozen_options)
 | |
|         if cached_multi_world:
 | |
|             print(f"Using cached solo multi world [Seed = {cached_multi_world.seed}] [Cache size = {len(_cache)}]")
 | |
|             return cached_multi_world
 | |
| 
 | |
|     multiworld = setup_base_solo_multiworld(StardewValleyWorld, (), seed=seed)
 | |
|     # print(f"Seed: {multiworld.seed}") # Uncomment to print the seed for every test
 | |
| 
 | |
|     args = fill_namespace_with_default(test_options)
 | |
|     multiworld.set_options(args)
 | |
| 
 | |
|     if "start_inventory" in test_options:
 | |
|         for item, amount in test_options["start_inventory"].items():
 | |
|             for _ in range(amount):
 | |
|                 multiworld.push_precollected(multiworld.create_item(item, 1))
 | |
| 
 | |
|     for step in _steps:
 | |
|         call_all(multiworld, step)
 | |
| 
 | |
|     if should_cache:
 | |
|         add_to_world_cache(_cache, frozen_options, multiworld)  # noqa
 | |
| 
 | |
|     # Lock is needed for multi-threading tests
 | |
|     setattr(multiworld, "lock", threading.Lock())
 | |
| 
 | |
|     return multiworld
 | |
| 
 | |
| 
 | |
| def search_world_cache(cache: Dict[frozenset, MultiWorld], frozen_options: frozenset) -> Optional[MultiWorld]:
 | |
|     try:
 | |
|         return cache[frozen_options]
 | |
|     except KeyError:
 | |
|         for cached_options, multi_world in cache.items():
 | |
|             if frozen_options.issubset(cached_options):
 | |
|                 return multi_world
 | |
|         return None
 | |
| 
 | |
| 
 | |
| def add_to_world_cache(cache: Dict[frozenset, MultiWorld], frozen_options: frozenset, multi_world: MultiWorld) -> None:
 | |
|     # We could complete the key with all the default options, but that does not seem to improve performances.
 | |
|     cache[frozen_options] = multi_world
 | |
| 
 | |
| 
 | |
| def setup_multiworld(test_options: Iterable[Dict[str, int]] = None, seed=None) -> MultiWorld:  # noqa
 | |
|     if test_options is None:
 | |
|         test_options = []
 | |
| 
 | |
|     multiworld = MultiWorld(len(test_options))
 | |
|     multiworld.player_name = {}
 | |
|     multiworld.set_seed(seed)
 | |
|     multiworld.state = CollectionState(multiworld)
 | |
|     for i in range(1, len(test_options) + 1):
 | |
|         multiworld.game[i] = StardewValleyWorld.game
 | |
|         multiworld.player_name.update({i: f"Tester{i}"})
 | |
|     args = fill_namespace_with_default(test_options)
 | |
|     multiworld.set_options(args)
 | |
| 
 | |
|     for step in gen_steps:
 | |
|         call_all(multiworld, step)
 | |
| 
 | |
|     return multiworld
 |