mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

* Init * remove submodule * Init * Update docs * Fix tests * Update to use apcivvi * Update Readme and codeowners * Minor changes * Remove .value from options (except starting hint) * Minor updates * remove unnecessary property * Cleanup Rules and Region * Fix output file generation * Implement feedback * Remove 'AP' tag and fix issue with format strings and using same quotes * Update worlds/civ_6/__init__.py Co-authored-by: Scipio Wright <scipiowright@gmail.com> * Minor docs changes * minor updates * Small rework of create items * Minor updates * Remove unused variable * Move client to Launcher Components with rest of similar clients * Revert "Move client to Launcher Components with rest of similar clients" This reverts commit f9fd5df9fdf19eaf4f1de54e21e3c33a74f02364. * modify component * Fix generation issues * Fix tests * Minor change * Add improvement and test case * Minor options changes * . * Preliminary Review * Fix failing test due to slot data serialization * Format json * Remove exclude missable boosts * Update options (update goody hut text, make research multiplier a range) * Update docs punctuation and slot data init * Move priority/excluded locations into options * Implement docs PR feedback * PR Feedback for options * PR feedback misc * Update location classification and fix client type * Fix typings * Update research cost multiplier * Remove unnecessary location priority code * Remove extrenous use of items() * WIP PR Feedback * WIP PR Feedback * Add victory event * Add option set for death link effect * PR improvements * Update post fill hint to support items with multiple classifications * remove unnecessary len * Move location exclusion logic * Update test to use set instead of accidental dict * Update docs around progressive eras and boost locations * Update docs for options to be more readable * Fix issue with filler items and prehints * Update filler_data to be static * Update links in docs * Minor updates and PR feedback * Update boosts data * Update era required items * Update existing techs * Update existing techs * move boost data class * Update reward data * Update prereq data * Update new items and progressive districts * Remove unused code * Make filler item name func more efficient * Update death link text * Move Civ6 to the end of readme * Fix bug with hidden locations and location.name * Partial PR Feedback Implementation * Format changes * Minor review feedback * Modify access rules to use list created in generate_early * Modify boost rules to precalculate requirements * Remove option checks from access rules * Fix issue with pre initialized dicts * Add inno setup for civ6 client * Update inno_setup.iss --------- Co-authored-by: Scipio Wright <scipiowright@gmail.com> Co-authored-by: Exempt-Medic <60412657+Exempt-Medic@users.noreply.github.com> Co-authored-by: Exempt-Medic <ExemptMedic@Gmail.com> Co-authored-by: NewSoupVi <57900059+NewSoupVi@users.noreply.github.com>
343 lines
14 KiB
Python
343 lines
14 KiB
Python
import asyncio
|
|
import logging
|
|
import os
|
|
import traceback
|
|
from typing import Any, Dict, List, Optional
|
|
import zipfile
|
|
|
|
from CommonClient import ClientCommandProcessor, CommonContext, get_base_parser, logger, server_loop, gui_enabled
|
|
from .Data import get_progressive_districts_data
|
|
from .DeathLink import handle_check_deathlink
|
|
from NetUtils import ClientStatus
|
|
import Utils
|
|
from .CivVIInterface import CivVIInterface, ConnectionState
|
|
from .Enum import CivVICheckType
|
|
from .Items import CivVIItemData, generate_item_table, get_item_by_civ_name
|
|
from .Locations import CivVILocationData, generate_era_location_table
|
|
from .TunerClient import TunerErrorException, TunerTimeoutException
|
|
|
|
|
|
class CivVICommandProcessor(ClientCommandProcessor):
|
|
def __init__(self, ctx: CommonContext):
|
|
super().__init__(ctx)
|
|
|
|
def _cmd_deathlink(self):
|
|
"""Toggle deathlink from client. Overrides default setting."""
|
|
if isinstance(self.ctx, CivVIContext):
|
|
self.ctx.death_link_enabled = not self.ctx.death_link_enabled
|
|
self.ctx.death_link_just_changed = True
|
|
Utils.async_start(self.ctx.update_death_link(
|
|
self.ctx.death_link_enabled), name="Update Deathlink")
|
|
self.ctx.logger.info(f"Deathlink is now {'enabled' if self.ctx.death_link_enabled else 'disabled'}")
|
|
|
|
def _cmd_resync(self):
|
|
"""Resends all items to client, and has client resend all locations to server. This can take up to a minute if the player has received a lot of items"""
|
|
if isinstance(self.ctx, CivVIContext):
|
|
logger.info("Resyncing...")
|
|
asyncio.create_task(self.ctx.resync())
|
|
|
|
def _cmd_toggle_progressive_eras(self):
|
|
"""If you get stuck for some reason and unable to continue your game, you can run this command to disable the defeat that comes from pushing past the max unlocked era """
|
|
if isinstance(self.ctx, CivVIContext):
|
|
print("Toggling progressive eras, stand by...")
|
|
self.ctx.is_pending_toggle_progressive_eras = True
|
|
|
|
|
|
class CivVIContext(CommonContext):
|
|
is_pending_death_link_reset = False
|
|
is_pending_toggle_progressive_eras = False
|
|
command_processor = CivVICommandProcessor
|
|
game = "Civilization VI"
|
|
items_handling = 0b111
|
|
tuner_sync_task: Optional[asyncio.Task[None]] = None
|
|
game_interface: CivVIInterface
|
|
location_name_to_civ_location: Dict[str, CivVILocationData] = {}
|
|
location_name_to_id: Dict[str, int] = {}
|
|
item_id_to_civ_item: Dict[int, CivVIItemData] = {}
|
|
item_table: Dict[str, CivVIItemData] = {}
|
|
processing_multiple_items = False
|
|
received_death_link = False
|
|
death_link_message = ""
|
|
death_link_enabled = False
|
|
slot_data: Dict[str, Any]
|
|
|
|
death_link_just_changed = False
|
|
# Used to prevent the deathlink from triggering when someone re enables it
|
|
|
|
logger = logger
|
|
progressive_items_by_type = get_progressive_districts_data()
|
|
item_name_to_id = {
|
|
item.name: item.code for item in generate_item_table().values()}
|
|
connection_state = ConnectionState.DISCONNECTED
|
|
|
|
def __init__(self, server_address: Optional[str], password: Optional[str], apcivvi_file: Optional[str] = None):
|
|
super().__init__(server_address, password)
|
|
self.slot_data: Dict[str, Any] = {}
|
|
self.game_interface = CivVIInterface(logger)
|
|
location_by_era = generate_era_location_table()
|
|
self.item_table = generate_item_table()
|
|
self.apcivvi_file = apcivvi_file
|
|
|
|
# Get tables formatted in a way that is easier to use here
|
|
for locations in location_by_era.values():
|
|
for location in locations.values():
|
|
self.location_name_to_id[location.name] = location.code
|
|
self.location_name_to_civ_location[location.name] = location
|
|
|
|
for item in self.item_table.values():
|
|
self.item_id_to_civ_item[item.code] = item
|
|
|
|
async def resync(self):
|
|
if self.processing_multiple_items:
|
|
logger.info(
|
|
"Waiting for items to finish processing, try again later")
|
|
return
|
|
await self.game_interface.resync()
|
|
await handle_receive_items(self, -1)
|
|
logger.info("Resynced")
|
|
|
|
def on_deathlink(self, data: Utils.Dict[str, Utils.Any]) -> None:
|
|
super().on_deathlink(data)
|
|
text = data.get("cause", "")
|
|
if text:
|
|
message = text
|
|
else:
|
|
message = f"Received from {data['source']}"
|
|
self.death_link_message = message
|
|
self.received_death_link = True
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
await super(CivVIContext, self).server_auth(password_requested)
|
|
await self.get_username()
|
|
self.tags = set()
|
|
await self.send_connect()
|
|
|
|
def run_gui(self):
|
|
from kvui import GameManager
|
|
|
|
class CivVIManager(GameManager):
|
|
logging_pairs = [
|
|
("Client", "Archipelago")
|
|
]
|
|
base_title = "Archipelago Civilization VI Client"
|
|
|
|
self.ui = CivVIManager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
def on_package(self, cmd: str, args: Dict[str, Any]):
|
|
if cmd == "Connected":
|
|
self.slot_data = args["slot_data"]
|
|
if "death_link" in args["slot_data"]:
|
|
self.death_link_enabled = bool(args["slot_data"]["death_link"])
|
|
Utils.async_start(self.update_death_link(
|
|
bool(args["slot_data"]["death_link"])))
|
|
|
|
|
|
def update_connection_status(ctx: CivVIContext, status: ConnectionState):
|
|
if ctx.connection_state == status:
|
|
return
|
|
elif status == ConnectionState.IN_GAME:
|
|
ctx.logger.info("Connected to Civ VI")
|
|
elif status == ConnectionState.IN_MENU:
|
|
ctx.logger.info("Connected to Civ VI, waiting for game to start")
|
|
elif status == ConnectionState.DISCONNECTED:
|
|
ctx.logger.info("Disconnected from Civ VI, attempting to reconnect...")
|
|
|
|
ctx.connection_state = status
|
|
|
|
|
|
async def tuner_sync_task(ctx: CivVIContext):
|
|
logger.info("Starting CivVI connector")
|
|
while not ctx.exit_event.is_set():
|
|
if not ctx.slot:
|
|
await asyncio.sleep(3)
|
|
continue
|
|
else:
|
|
try:
|
|
if ctx.processing_multiple_items:
|
|
await asyncio.sleep(3)
|
|
else:
|
|
state = await ctx.game_interface.is_in_game()
|
|
update_connection_status(ctx, state)
|
|
if state == ConnectionState.IN_GAME:
|
|
await _handle_game_ready(ctx)
|
|
else:
|
|
await asyncio.sleep(3)
|
|
except TunerTimeoutException:
|
|
logger.error(
|
|
"Timeout occurred while receiving data from Civ VI, this usually isn't a problem unless you see it repeatedly")
|
|
await asyncio.sleep(3)
|
|
except Exception as e:
|
|
if isinstance(e, TunerErrorException):
|
|
logger.debug(str(e))
|
|
else:
|
|
logger.debug(traceback.format_exc())
|
|
|
|
await asyncio.sleep(3)
|
|
continue
|
|
|
|
|
|
async def handle_toggle_progressive_eras(ctx: CivVIContext):
|
|
if ctx.is_pending_toggle_progressive_eras:
|
|
ctx.is_pending_toggle_progressive_eras = False
|
|
current = await ctx.game_interface.get_max_allowed_era()
|
|
if current > -1:
|
|
await ctx.game_interface.set_max_allowed_era(-1)
|
|
logger.info("Disabled progressive eras")
|
|
else:
|
|
count = 0
|
|
for _, network_item in enumerate(ctx.items_received):
|
|
item: CivVIItemData = ctx.item_id_to_civ_item[network_item.item]
|
|
if item.item_type == CivVICheckType.ERA:
|
|
count += 1
|
|
await ctx.game_interface.set_max_allowed_era(count)
|
|
logger.info(f"Enabled progressive eras, set to {count}")
|
|
|
|
|
|
async def handle_checked_location(ctx: CivVIContext):
|
|
checked_locations = await ctx.game_interface.get_checked_locations()
|
|
checked_location_ids = [location.code for location_name, location in ctx.location_name_to_civ_location.items(
|
|
) if location_name in checked_locations]
|
|
|
|
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": checked_location_ids}])
|
|
|
|
|
|
async def handle_receive_items(ctx: CivVIContext, last_received_index_override: Optional[int] = None):
|
|
try:
|
|
last_received_index = last_received_index_override or await ctx.game_interface.get_last_received_index()
|
|
if len(ctx.items_received) - last_received_index > 1:
|
|
ctx.processing_multiple_items = True
|
|
|
|
progressive_districts: List[CivVIItemData] = []
|
|
progressive_eras: List[CivVIItemData] = []
|
|
for index, network_item in enumerate(ctx.items_received):
|
|
|
|
# Track these separately so if we replace "PROGRESSIVE_DISTRICT" with a specific tech, we can still check if need to add it to the list of districts
|
|
item: CivVIItemData = ctx.item_id_to_civ_item[network_item.item]
|
|
item_to_send: CivVIItemData = ctx.item_id_to_civ_item[network_item.item]
|
|
if index > last_received_index:
|
|
if item.item_type == CivVICheckType.PROGRESSIVE_DISTRICT and item.civ_name:
|
|
# if the item is progressive, then check how far in that progression type we are and send the appropriate item
|
|
count = sum(
|
|
1 for count_item in progressive_districts if count_item.civ_name == item.civ_name)
|
|
|
|
if count >= len(ctx.progressive_items_by_type[item.civ_name]):
|
|
logger.error(
|
|
f"Received more progressive items than expected for {item.civ_name}")
|
|
continue
|
|
|
|
item_civ_name = ctx.progressive_items_by_type[item.civ_name][count]
|
|
actual_item_name = get_item_by_civ_name(item_civ_name, ctx.item_table).name
|
|
item_to_send = ctx.item_table[actual_item_name]
|
|
|
|
sender = ctx.player_names[network_item.player]
|
|
if item.item_type == CivVICheckType.ERA:
|
|
count = len(progressive_eras) + 1
|
|
await ctx.game_interface.give_item_to_player(item_to_send, sender, count)
|
|
elif item.item_type == CivVICheckType.GOODY and item_to_send.civ_name:
|
|
await ctx.game_interface.give_item_to_player(item_to_send, sender, game_id_override=item_to_send.civ_name)
|
|
else:
|
|
await ctx.game_interface.give_item_to_player(item_to_send, sender)
|
|
await asyncio.sleep(0.02)
|
|
|
|
if item.item_type == CivVICheckType.PROGRESSIVE_DISTRICT:
|
|
progressive_districts.append(item)
|
|
elif item.item_type == CivVICheckType.ERA:
|
|
progressive_eras.append(item)
|
|
|
|
ctx.processing_multiple_items = False
|
|
finally:
|
|
# If something errors out, then unblock item processing
|
|
ctx.processing_multiple_items = False
|
|
|
|
|
|
async def handle_check_goal_complete(ctx: CivVIContext):
|
|
if ctx.finished_game:
|
|
return
|
|
result = await ctx.game_interface.check_victory()
|
|
if result:
|
|
logger.info("Sending Victory to server!")
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
|
|
|
|
async def _handle_game_ready(ctx: CivVIContext):
|
|
if ctx.server:
|
|
if not ctx.slot:
|
|
await asyncio.sleep(3)
|
|
return
|
|
|
|
await handle_receive_items(ctx)
|
|
await handle_checked_location(ctx)
|
|
await handle_check_goal_complete(ctx)
|
|
|
|
if ctx.death_link_enabled:
|
|
await handle_check_deathlink(ctx)
|
|
|
|
# process pending commands
|
|
await handle_toggle_progressive_eras(ctx)
|
|
await asyncio.sleep(3)
|
|
else:
|
|
logger.info("Waiting for player to connect to server")
|
|
await asyncio.sleep(3)
|
|
|
|
|
|
def main(connect: Optional[str] = None, password: Optional[str] = None, name: Optional[str] = None):
|
|
Utils.init_logging("Civilization VI Client")
|
|
|
|
async def _main(connect: Optional[str], password: Optional[str], name: Optional[str]):
|
|
parser = get_base_parser()
|
|
parser.add_argument("apcivvi_file", default="", type=str, nargs="?", help="Path to apcivvi file")
|
|
args = parser.parse_args()
|
|
ctx = CivVIContext(connect, password, args.apcivvi_file)
|
|
|
|
if args.apcivvi_file:
|
|
parent_dir: str = os.path.dirname(args.apcivvi_file)
|
|
target_name: str = os.path.basename(args.apcivvi_file).replace(".apcivvi", "-MOD-FILES")
|
|
target_path: str = os.path.join(parent_dir, target_name)
|
|
if not os.path.exists(target_path):
|
|
os.makedirs(target_path, exist_ok=True)
|
|
logger.info("Extracting mod files to %s", target_path)
|
|
with zipfile.ZipFile(args.apcivvi_file, "r") as zip_ref:
|
|
for member in zip_ref.namelist():
|
|
zip_ref.extract(member, target_path)
|
|
|
|
ctx.auth = name
|
|
ctx.server_task = asyncio.create_task(
|
|
server_loop(ctx), name="ServerLoop")
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
await asyncio.sleep(1)
|
|
|
|
ctx.tuner_sync_task = asyncio.create_task(
|
|
tuner_sync_task(ctx), name="TunerSync")
|
|
|
|
await ctx.exit_event.wait()
|
|
ctx.server_address = None
|
|
|
|
await ctx.shutdown()
|
|
|
|
if ctx.tuner_sync_task:
|
|
await asyncio.sleep(3)
|
|
await ctx.tuner_sync_task
|
|
|
|
import colorama
|
|
|
|
colorama.init()
|
|
asyncio.run(_main(connect, password, name))
|
|
colorama.deinit()
|
|
|
|
|
|
def debug_main():
|
|
parser = get_base_parser()
|
|
parser.add_argument("apcivvi_file", default="", type=str, nargs="?", help="Path to apcivvi file")
|
|
parser.add_argument("--name", default=None,
|
|
help="Slot Name to connect as.")
|
|
parser.add_argument("--debug", default=None,
|
|
help="debug mode, additional logging")
|
|
args = parser.parse_args()
|
|
if args.debug:
|
|
logger.setLevel(logging.DEBUG)
|
|
main(args.connect, args.password, args.name)
|