mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

- Removes the Pokémon Client, adding support for Red and Blue to the Bizhawk Client. - Adds `/bank` commands that mirror SDV's, allowing transferring money into and out of the EnergyLink storage. - Adds a fix to the base patch so that the progressive card key counter will not increment beyond 10, which would lead to receiving glitch items. This value is checked against and verified that it is not > 10 as part of crash detection by the client, to prevent erroneous location checks when the game crashes, so this is relevant to the new client (although shouldn't happen unless you're using !getitem, or putting progressive card keys as item link replacement items)
278 lines
13 KiB
Python
278 lines
13 KiB
Python
import base64
|
|
import logging
|
|
import time
|
|
|
|
from NetUtils import ClientStatus
|
|
from worlds._bizhawk.client import BizHawkClient
|
|
from worlds._bizhawk import read, write, guarded_write
|
|
|
|
from worlds.pokemon_rb.locations import location_data
|
|
|
|
logger = logging.getLogger("Client")
|
|
|
|
BANK_EXCHANGE_RATE = 100000000
|
|
|
|
DATA_LOCATIONS = {
|
|
"ItemIndex": (0x1A6E, 0x02),
|
|
"Deathlink": (0x00FD, 0x01),
|
|
"APItem": (0x00FF, 0x01),
|
|
"EventFlag": (0x1735, 0x140),
|
|
"Missable": (0x161A, 0x20),
|
|
"Hidden": (0x16DE, 0x0E),
|
|
"Rod": (0x1716, 0x01),
|
|
"DexSanityFlag": (0x1A71, 19),
|
|
"GameStatus": (0x1A84, 0x01),
|
|
"Money": (0x141F, 3),
|
|
"ResetCheck": (0x0100, 4),
|
|
# First and second Vermilion Gym trash can selection. Second is not used, so should always be 0.
|
|
# First should never be above 0x0F. This is just before Event Flags.
|
|
"CrashCheck1": (0x1731, 2),
|
|
# Unused, should always be 0. This is just before Missables flags.
|
|
"CrashCheck2": (0x1617, 1),
|
|
# Progressive keys, should never be above 10. Just before Dexsanity flags.
|
|
"CrashCheck3": (0x1A70, 1),
|
|
# Route 18 script value. Should never be above 2. Just before Hidden items flags.
|
|
"CrashCheck4": (0x16DD, 1),
|
|
}
|
|
|
|
location_map = {"Rod": {}, "EventFlag": {}, "Missable": {}, "Hidden": {}, "list": {}, "DexSanityFlag": {}}
|
|
location_bytes_bits = {}
|
|
for location in location_data:
|
|
if location.ram_address is not None:
|
|
if type(location.ram_address) == list:
|
|
location_map[type(location.ram_address).__name__][(location.ram_address[0].flag, location.ram_address[1].flag)] = location.address
|
|
location_bytes_bits[location.address] = [{'byte': location.ram_address[0].byte, 'bit': location.ram_address[0].bit},
|
|
{'byte': location.ram_address[1].byte, 'bit': location.ram_address[1].bit}]
|
|
else:
|
|
location_map[type(location.ram_address).__name__][location.ram_address.flag] = location.address
|
|
location_bytes_bits[location.address] = {'byte': location.ram_address.byte, 'bit': location.ram_address.bit}
|
|
|
|
location_name_to_id = {location.name: location.address for location in location_data if location.type == "Item"
|
|
and location.address is not None}
|
|
|
|
|
|
class PokemonRBClient(BizHawkClient):
|
|
system = ("GB", "SGB")
|
|
patch_suffix = (".apred", ".apblue")
|
|
game = "Pokemon Red and Blue"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.auto_hints = set()
|
|
self.locations_array = None
|
|
self.disconnect_pending = False
|
|
self.set_deathlink = False
|
|
self.banking_command = None
|
|
self.game_state = False
|
|
self.last_death_link = 0
|
|
|
|
async def validate_rom(self, ctx):
|
|
game_name = await read(ctx.bizhawk_ctx, [(0x134, 12, "ROM")])
|
|
game_name = game_name[0].decode("ascii")
|
|
if game_name in ("POKEMON RED\00", "POKEMON BLUE"):
|
|
ctx.game = self.game
|
|
ctx.items_handling = 0b001
|
|
ctx.command_processor.commands["bank"] = cmd_bank
|
|
seed_name = await read(ctx.bizhawk_ctx, [(0xFFDB, 21, "ROM")])
|
|
ctx.seed_name = seed_name[0].split(b"\0")[0].decode("ascii")
|
|
self.set_deathlink = False
|
|
self.banking_command = None
|
|
self.locations_array = None
|
|
self.disconnect_pending = False
|
|
return True
|
|
return False
|
|
|
|
async def set_auth(self, ctx):
|
|
auth_name = await read(ctx.bizhawk_ctx, [(0xFFC6, 21, "ROM")])
|
|
if auth_name[0] == bytes([0] * 21):
|
|
# rom was patched before rom names implemented, use player name
|
|
auth_name = await read(ctx.bizhawk_ctx, [(0xFFF0, 16, "ROM")])
|
|
auth_name = auth_name[0].decode("ascii").split("\x00")[0]
|
|
else:
|
|
auth_name = base64.b64encode(auth_name[0]).decode()
|
|
ctx.auth = auth_name
|
|
|
|
async def game_watcher(self, ctx):
|
|
if not ctx.server or not ctx.server.socket.open or ctx.server.socket.closed:
|
|
return
|
|
|
|
data = await read(ctx.bizhawk_ctx, [(loc_data[0], loc_data[1], "WRAM")
|
|
for loc_data in DATA_LOCATIONS.values()])
|
|
data = {data_set_name: data_name for data_set_name, data_name in zip(DATA_LOCATIONS.keys(), data)}
|
|
|
|
if self.set_deathlink:
|
|
self.set_deathlink = False
|
|
await ctx.update_death_link(True)
|
|
|
|
if self.disconnect_pending:
|
|
self.disconnect_pending = False
|
|
await ctx.disconnect()
|
|
|
|
if data["GameStatus"][0] == 0 or data["ResetCheck"] == b'\xff\xff\xff\x7f':
|
|
# Do not handle anything before game save is loaded
|
|
self.game_state = False
|
|
return
|
|
elif (data["GameStatus"][0] not in (0x2A, 0xAC)
|
|
or data["CrashCheck1"][0] & 0xF0 or data["CrashCheck1"][1] & 0xFF
|
|
or data["CrashCheck2"][0]
|
|
or data["CrashCheck3"][0] > 10
|
|
or data["CrashCheck4"][0] > 2):
|
|
# Should mean game crashed
|
|
logger.warning("Pokémon Red/Blue game may have crashed. Disconnecting from server.")
|
|
self.game_state = False
|
|
await ctx.disconnect()
|
|
return
|
|
self.game_state = True
|
|
|
|
# SEND ITEMS TO CLIENT
|
|
|
|
if data["APItem"][0] == 0:
|
|
item_index = int.from_bytes(data["ItemIndex"], "little")
|
|
if len(ctx.items_received) > item_index:
|
|
item_code = ctx.items_received[item_index].item - 172000000
|
|
if item_code > 255:
|
|
item_code -= 256
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["APItem"][0],
|
|
[item_code], "WRAM")])
|
|
|
|
# LOCATION CHECKS
|
|
|
|
locations = set()
|
|
|
|
for flag_type, loc_map in location_map.items():
|
|
for flag, loc_id in loc_map.items():
|
|
if flag_type == "list":
|
|
if (data["EventFlag"][location_bytes_bits[loc_id][0]['byte']] & 1 <<
|
|
location_bytes_bits[loc_id][0]['bit']
|
|
and data["Missable"][location_bytes_bits[loc_id][1]['byte']] & 1 <<
|
|
location_bytes_bits[loc_id][1]['bit']):
|
|
locations.add(loc_id)
|
|
elif data[flag_type][location_bytes_bits[loc_id]['byte']] & 1 << location_bytes_bits[loc_id]['bit']:
|
|
locations.add(loc_id)
|
|
|
|
if locations != self.locations_array:
|
|
if locations:
|
|
self.locations_array = locations
|
|
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": list(locations)}])
|
|
|
|
# AUTO HINTS
|
|
|
|
hints = []
|
|
if data["EventFlag"][280] & 16:
|
|
hints.append("Cerulean Bicycle Shop")
|
|
if data["EventFlag"][280] & 32:
|
|
hints.append("Route 2 Gate - Oak's Aide")
|
|
if data["EventFlag"][280] & 64:
|
|
hints.append("Route 11 Gate 2F - Oak's Aide")
|
|
if data["EventFlag"][280] & 128:
|
|
hints.append("Route 15 Gate 2F - Oak's Aide")
|
|
if data["EventFlag"][281] & 1:
|
|
hints += ["Celadon Prize Corner - Item Prize 1", "Celadon Prize Corner - Item Prize 2",
|
|
"Celadon Prize Corner - Item Prize 3"]
|
|
if (location_name_to_id["Fossil - Choice A"] in ctx.checked_locations and location_name_to_id[
|
|
"Fossil - Choice B"]
|
|
not in ctx.checked_locations):
|
|
hints.append("Fossil - Choice B")
|
|
elif (location_name_to_id["Fossil - Choice B"] in ctx.checked_locations and location_name_to_id[
|
|
"Fossil - Choice A"]
|
|
not in ctx.checked_locations):
|
|
hints.append("Fossil - Choice A")
|
|
hints = [
|
|
location_name_to_id[loc] for loc in hints if location_name_to_id[loc] not in self.auto_hints and
|
|
location_name_to_id[loc] in ctx.missing_locations and
|
|
location_name_to_id[loc] not in ctx.locations_checked
|
|
]
|
|
if hints:
|
|
await ctx.send_msgs([{"cmd": "LocationScouts", "locations": hints, "create_as_hint": 2}])
|
|
self.auto_hints.update(hints)
|
|
|
|
# DEATHLINK
|
|
|
|
if "DeathLink" in ctx.tags:
|
|
if data["Deathlink"][0] == 3:
|
|
await ctx.send_death(ctx.player_names[ctx.slot] + " is out of usable Pokémon! "
|
|
+ ctx.player_names[ctx.slot] + " blacked out!")
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [0], "WRAM")])
|
|
self.last_death_link = ctx.last_death_link
|
|
elif ctx.last_death_link > self.last_death_link:
|
|
self.last_death_link = ctx.last_death_link
|
|
await write(ctx.bizhawk_ctx, [(DATA_LOCATIONS["Deathlink"][0], [1], "WRAM")])
|
|
|
|
# BANK
|
|
|
|
if self.banking_command:
|
|
original_money = data["Money"]
|
|
# Money is stored as binary-coded decimal.
|
|
money = int(original_money.hex())
|
|
if self.banking_command > money:
|
|
logger.warning(f"You do not have ${self.banking_command} to deposit!")
|
|
elif (-self.banking_command * BANK_EXCHANGE_RATE) > ctx.stored_data[f"EnergyLink{ctx.team}"]:
|
|
logger.warning("Not enough money in the EnergyLink storage!")
|
|
else:
|
|
if self.banking_command + money > 999999:
|
|
self.banking_command = 999999 - money
|
|
money = str(money - self.banking_command).zfill(6)
|
|
money = [int(money[:2], 16), int(money[2:4], 16), int(money[4:], 16)]
|
|
money_written = await guarded_write(ctx.bizhawk_ctx, [(0x141F, money, "WRAM")],
|
|
[(0x141F, original_money, "WRAM")])
|
|
if money_written:
|
|
if self.banking_command >= 0:
|
|
deposit = self.banking_command - int(self.banking_command / 4)
|
|
tax = self.banking_command - deposit
|
|
logger.info(f"Deposited ${deposit}, and charged a tax of ${tax}.")
|
|
self.banking_command = deposit
|
|
else:
|
|
logger.info(f"Withdrew ${-self.banking_command}.")
|
|
await ctx.send_msgs([{
|
|
"cmd": "Set", "key": f"EnergyLink{ctx.team}", "operations":
|
|
[{"operation": "add", "value": self.banking_command * BANK_EXCHANGE_RATE},
|
|
{"operation": "max", "value": 0}],
|
|
}])
|
|
self.banking_command = None
|
|
|
|
# VICTORY
|
|
|
|
if data["EventFlag"][280] & 1 and not ctx.finished_game:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
|
|
def on_package(self, ctx, cmd, args):
|
|
if cmd == 'Connected':
|
|
if 'death_link' in args['slot_data'] and args['slot_data']['death_link']:
|
|
self.set_deathlink = True
|
|
self.last_death_link = time.time()
|
|
ctx.set_notify(f"EnergyLink{ctx.team}")
|
|
elif cmd == 'RoomInfo':
|
|
if ctx.seed_name and ctx.seed_name != args["seed_name"]:
|
|
# CommonClient's on_package displays an error to the user in this case, but connection is not cancelled.
|
|
self.game_state = False
|
|
self.disconnect_pending = True
|
|
super().on_package(ctx, cmd, args)
|
|
|
|
|
|
def cmd_bank(self, cmd: str = "", amount: str = ""):
|
|
"""Deposit or withdraw money with the server's EnergyLink storage.
|
|
/bank - check server balance.
|
|
/bank deposit # - deposit money. One quarter of the amount will be lost to taxation.
|
|
/bank withdraw # - withdraw money."""
|
|
if self.ctx.game != "Pokemon Red and Blue":
|
|
logger.warning("This command can only be used while playing Pokémon Red and Blue")
|
|
return
|
|
if not cmd:
|
|
logger.info(f"Money available: {int(self.ctx.stored_data[f'EnergyLink{self.ctx.team}'] / BANK_EXCHANGE_RATE)}")
|
|
return
|
|
elif (not self.ctx.server) or self.ctx.server.socket.closed or not self.ctx.client_handler.game_state:
|
|
logger.info(f"Must be connected to server and in game.")
|
|
elif not amount:
|
|
logger.warning("You must specify an amount.")
|
|
elif cmd == "withdraw":
|
|
self.ctx.client_handler.banking_command = -int(amount)
|
|
elif cmd == "deposit":
|
|
if int(amount) < 4:
|
|
logger.warning("You must deposit at least $4, for tax purposes.")
|
|
return
|
|
self.ctx.client_handler.banking_command = int(amount)
|
|
else:
|
|
logger.warning(f"Invalid bank command {cmd}")
|
|
return
|