Files
Grinch-AP/UndertaleClient.py
Mewlif aff852fb45 Undertale: Various Fixes (#2146)
* Changed the pathing code to use os.path.join, instead of adding strings together, also fixed the savepath command using UndertaleContext instead of self.ctx (Credit to Wackerly for finding the self.ctx issue and fix)

* Undertale: Fixed a debug function in the game not requiring debug to be enabled.

* Undetale: Fixed a logic bug with the location "Letter Quest"
2023-09-20 10:18:53 +02:00

513 lines
23 KiB
Python

from __future__ import annotations
import os
import sys
import asyncio
import typing
import bsdiff4
import shutil
import Utils
from NetUtils import NetworkItem, ClientStatus
from worlds import undertale
from MultiServer import mark_raw
from CommonClient import CommonContext, server_loop, \
gui_enabled, ClientCommandProcessor, logger, get_base_parser
from Utils import async_start
class UndertaleCommandProcessor(ClientCommandProcessor):
def __init__(self, ctx):
super().__init__(ctx)
def _cmd_resync(self):
"""Manually trigger a resync."""
if isinstance(self.ctx, UndertaleContext):
self.output(f"Syncing items.")
self.ctx.syncing = True
def _cmd_patch(self):
"""Patch the game."""
if isinstance(self.ctx, UndertaleContext):
os.makedirs(name=os.path.join(os.getcwd(), "Undertale"), exist_ok=True)
self.ctx.patch_game()
self.output("Patched.")
def _cmd_savepath(self, directory: str):
"""Redirect to proper save data folder. (Use before connecting!)"""
if isinstance(self.ctx, UndertaleContext):
self.ctx.save_game_folder = directory
self.output("Changed to the following directory: " + self.ctx.save_game_folder)
@mark_raw
def _cmd_auto_patch(self, steaminstall: typing.Optional[str] = None):
"""Patch the game automatically."""
if isinstance(self.ctx, UndertaleContext):
os.makedirs(name=os.path.join(os.getcwd(), "Undertale"), exist_ok=True)
tempInstall = steaminstall
if not os.path.isfile(os.path.join(tempInstall, "data.win")):
tempInstall = None
if tempInstall is None:
tempInstall = "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Undertale"
if not os.path.exists(tempInstall):
tempInstall = "C:\\Program Files\\Steam\\steamapps\\common\\Undertale"
elif not os.path.exists(tempInstall):
tempInstall = "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Undertale"
if not os.path.exists(tempInstall):
tempInstall = "C:\\Program Files\\Steam\\steamapps\\common\\Undertale"
if not os.path.exists(tempInstall) or not os.path.exists(tempInstall) or not os.path.isfile(os.path.join(tempInstall, "data.win")):
self.output("ERROR: Cannot find Undertale. Please rerun the command with the correct folder."
" command. \"/auto_patch (Steam directory)\".")
else:
for file_name in os.listdir(tempInstall):
if file_name != "steam_api.dll":
shutil.copy(os.path.join(tempInstall, file_name),
os.path.join(os.getcwd(), "Undertale", file_name))
self.ctx.patch_game()
self.output("Patching successful!")
def _cmd_online(self):
"""Makes you no longer able to see other Undertale players."""
if isinstance(self.ctx, UndertaleContext):
self.ctx.update_online_mode(not ("Online" in self.ctx.tags))
if "Online" in self.ctx.tags:
self.output(f"Now online.")
else:
self.output(f"Now offline.")
def _cmd_deathlink(self):
"""Toggles deathlink"""
if isinstance(self.ctx, UndertaleContext):
self.ctx.deathlink_status = not self.ctx.deathlink_status
if self.ctx.deathlink_status:
self.output(f"Deathlink enabled.")
else:
self.output(f"Deathlink disabled.")
class UndertaleContext(CommonContext):
tags = {"AP", "Online"}
game = "Undertale"
command_processor = UndertaleCommandProcessor
items_handling = 0b111
route = None
pieces_needed = None
completed_routes = None
completed_count = 0
save_game_folder = os.path.expandvars(r"%localappdata%/UNDERTALE")
def __init__(self, server_address, password):
super().__init__(server_address, password)
self.pieces_needed = 0
self.finished_game = False
self.game = "Undertale"
self.got_deathlink = False
self.syncing = False
self.deathlink_status = False
self.tem_armor = False
self.completed_count = 0
self.completed_routes = {"pacifist": 0, "genocide": 0, "neutral": 0}
# self.save_game_folder: files go in this path to pass data between us and the actual game
self.save_game_folder = os.path.expandvars(r"%localappdata%/UNDERTALE")
def patch_game(self):
with open(os.path.join(os.getcwd(), "Undertale", "data.win"), "rb") as f:
patchedFile = bsdiff4.patch(f.read(), undertale.data_path("patch.bsdiff"))
with open(os.path.join(os.getcwd(), "Undertale", "data.win"), "wb") as f:
f.write(patchedFile)
os.makedirs(name=os.path.join(os.getcwd(), "Undertale", "Custom Sprites"), exist_ok=True)
with open(os.path.expandvars(os.path.join(os.getcwd(), "Undertale", "Custom Sprites",
"Which Character.txt")), "w") as f:
f.writelines(["// Put the folder name of the sprites you want to play as, make sure it is the only "
"line other than this one.\n", "frisk"])
f.close()
async def server_auth(self, password_requested: bool = False):
if password_requested and not self.password:
await super().server_auth(password_requested)
await self.get_username()
await self.send_connect()
def clear_undertale_files(self):
path = self.save_game_folder
self.finished_game = False
for root, dirs, files in os.walk(path):
for file in files:
if "check.spot" == file or "scout" == file:
os.remove(os.path.join(root, file))
elif file.endswith((".item", ".victory", ".route", ".playerspot", ".mad",
".youDied", ".LV", ".mine", ".flag", ".hint")):
os.remove(os.path.join(root, file))
async def connect(self, address: typing.Optional[str] = None):
self.clear_undertale_files()
await super().connect(address)
async def disconnect(self, allow_autoreconnect: bool = False):
self.clear_undertale_files()
await super().disconnect(allow_autoreconnect)
async def connection_closed(self):
self.clear_undertale_files()
await super().connection_closed()
async def shutdown(self):
self.clear_undertale_files()
await super().shutdown()
def update_online_mode(self, online):
old_tags = self.tags.copy()
if online:
self.tags.add("Online")
else:
self.tags -= {"Online"}
if old_tags != self.tags and self.server and not self.server.socket.closed:
async_start(self.send_msgs([{"cmd": "ConnectUpdate", "tags": self.tags}]))
def on_package(self, cmd: str, args: dict):
if cmd == "Connected":
self.game = self.slot_info[self.slot].game
async_start(process_undertale_cmd(self, cmd, args))
def run_gui(self):
from kvui import GameManager
class UTManager(GameManager):
logging_pairs = [
("Client", "Archipelago")
]
base_title = "Archipelago Undertale Client"
self.ui = UTManager(self)
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
def on_deathlink(self, data: typing.Dict[str, typing.Any]):
self.got_deathlink = True
super().on_deathlink(data)
def to_room_name(place_name: str):
if place_name == "Old Home Exit":
return "room_ruinsexit"
elif place_name == "Snowdin Forest":
return "room_tundra1"
elif place_name == "Snowdin Town Exit":
return "room_fogroom"
elif place_name == "Waterfall":
return "room_water1"
elif place_name == "Waterfall Exit":
return "room_fire2"
elif place_name == "Hotland":
return "room_fire_prelab"
elif place_name == "Hotland Exit":
return "room_fire_precore"
elif place_name == "Core":
return "room_fire_core1"
async def process_undertale_cmd(ctx: UndertaleContext, cmd: str, args: dict):
if cmd == "Connected":
if not os.path.exists(ctx.save_game_folder):
os.mkdir(ctx.save_game_folder)
ctx.route = args["slot_data"]["route"]
ctx.pieces_needed = args["slot_data"]["key_pieces"]
ctx.tem_armor = args["slot_data"]["temy_armor_include"]
await ctx.send_msgs([{"cmd": "Get", "keys": [str(ctx.slot)+" RoutesDone neutral",
str(ctx.slot)+" RoutesDone pacifist",
str(ctx.slot)+" RoutesDone genocide"]}])
await ctx.send_msgs([{"cmd": "SetNotify", "keys": [str(ctx.slot)+" RoutesDone neutral",
str(ctx.slot)+" RoutesDone pacifist",
str(ctx.slot)+" RoutesDone genocide"]}])
if args["slot_data"]["only_flakes"]:
with open(os.path.join(ctx.save_game_folder, "GenoNoChest.flag"), "w") as f:
f.close()
if not args["slot_data"]["key_hunt"]:
ctx.pieces_needed = 0
if args["slot_data"]["rando_love"]:
filename = f"LOVErando.LV"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.close()
if args["slot_data"]["rando_stats"]:
filename = f"STATrando.LV"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.close()
filename = f"{ctx.route}.route"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.close()
filename = f"check.spot"
with open(os.path.join(ctx.save_game_folder, filename), "a") as f:
for ss in set(args["checked_locations"]):
f.write(str(ss-12000)+"\n")
f.close()
elif cmd == "LocationInfo":
for l in args["locations"]:
locationid = l.location
filename = f"{str(locationid-12000)}.hint"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
toDraw = ""
for i in range(20):
if i < len(str(ctx.item_names[l.item])):
toDraw += str(ctx.item_names[l.item])[i]
else:
break
f.write(toDraw)
f.close()
elif cmd == "Retrieved":
if str(ctx.slot)+" RoutesDone neutral" in args["keys"]:
if args["keys"][str(ctx.slot)+" RoutesDone neutral"] is not None:
ctx.completed_routes["neutral"] = args["keys"][str(ctx.slot)+" RoutesDone neutral"]
if str(ctx.slot)+" RoutesDone genocide" in args["keys"]:
if args["keys"][str(ctx.slot)+" RoutesDone genocide"] is not None:
ctx.completed_routes["genocide"] = args["keys"][str(ctx.slot)+" RoutesDone genocide"]
if str(ctx.slot)+" RoutesDone pacifist" in args["keys"]:
if args["keys"][str(ctx.slot) + " RoutesDone pacifist"] is not None:
ctx.completed_routes["pacifist"] = args["keys"][str(ctx.slot)+" RoutesDone pacifist"]
elif cmd == "SetReply":
if args["value"] is not None:
if str(ctx.slot)+" RoutesDone pacifist" == args["key"]:
ctx.completed_routes["pacifist"] = args["value"]
elif str(ctx.slot)+" RoutesDone genocide" == args["key"]:
ctx.completed_routes["genocide"] = args["value"]
elif str(ctx.slot)+" RoutesDone neutral" == args["key"]:
ctx.completed_routes["neutral"] = args["value"]
elif cmd == "ReceivedItems":
start_index = args["index"]
if start_index == 0:
ctx.items_received = []
elif start_index != len(ctx.items_received):
sync_msg = [{"cmd": "Sync"}]
if ctx.locations_checked:
sync_msg.append({"cmd": "LocationChecks",
"locations": list(ctx.locations_checked)})
await ctx.send_msgs(sync_msg)
if start_index == len(ctx.items_received):
counter = -1
placedWeapon = 0
placedArmor = 0
for item in args["items"]:
id = NetworkItem(*item).location
while NetworkItem(*item).location < 0 and \
counter <= id:
id -= 1
if NetworkItem(*item).location < 0:
counter -= 1
filename = f"{str(id)}PLR{str(NetworkItem(*item).player)}.item"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
if NetworkItem(*item).item == 77701:
if placedWeapon == 0:
f.write(str(77013-11000))
elif placedWeapon == 1:
f.write(str(77014-11000))
elif placedWeapon == 2:
f.write(str(77025-11000))
elif placedWeapon == 3:
f.write(str(77045-11000))
elif placedWeapon == 4:
f.write(str(77049-11000))
elif placedWeapon == 5:
f.write(str(77047-11000))
elif placedWeapon == 6:
if str(ctx.route) == "genocide" or str(ctx.route) == "all_routes":
f.write(str(77052-11000))
else:
f.write(str(77051-11000))
else:
f.write(str(77003-11000))
placedWeapon += 1
elif NetworkItem(*item).item == 77702:
if placedArmor == 0:
f.write(str(77012-11000))
elif placedArmor == 1:
f.write(str(77015-11000))
elif placedArmor == 2:
f.write(str(77024-11000))
elif placedArmor == 3:
f.write(str(77044-11000))
elif placedArmor == 4:
f.write(str(77048-11000))
elif placedArmor == 5:
if str(ctx.route) == "genocide":
f.write(str(77053-11000))
else:
f.write(str(77046-11000))
elif placedArmor == 6 and ((not str(ctx.route) == "genocide") or ctx.tem_armor):
if str(ctx.route) == "all_routes":
f.write(str(77053-11000))
elif str(ctx.route) == "genocide":
f.write(str(77064-11000))
else:
f.write(str(77050-11000))
elif placedArmor == 7 and ctx.tem_armor and not str(ctx.route) == "genocide":
f.write(str(77064-11000))
else:
f.write(str(77004-11000))
placedArmor += 1
else:
f.write(str(NetworkItem(*item).item-11000))
f.close()
ctx.items_received.append(NetworkItem(*item))
if [item.item for item in ctx.items_received].count(77000) >= ctx.pieces_needed > 0:
filename = f"{str(-99999)}PLR{str(0)}.item"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.write(str(77787 - 11000))
f.close()
filename = f"{str(-99998)}PLR{str(0)}.item"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.write(str(77789 - 11000))
f.close()
ctx.watcher_event.set()
elif cmd == "RoomUpdate":
if "checked_locations" in args:
filename = f"check.spot"
with open(os.path.join(ctx.save_game_folder, filename), "a") as f:
for ss in set(args["checked_locations"]):
f.write(str(ss-12000)+"\n")
f.close()
elif cmd == "Bounced":
tags = args.get("tags", [])
if "Online" in tags:
data = args.get("data", {})
if data["player"] != ctx.slot and data["player"] is not None:
filename = f"FRISK" + str(data["player"]) + ".playerspot"
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
f.write(str(data["x"]) + str(data["y"]) + str(data["room"]) + str(
data["spr"]) + str(data["frm"]))
f.close()
async def multi_watcher(ctx: UndertaleContext):
while not ctx.exit_event.is_set():
path = ctx.save_game_folder
for root, dirs, files in os.walk(path):
for file in files:
if "spots.mine" in file and "Online" in ctx.tags:
with open(os.path.join(root, file), "r") as mine:
this_x = mine.readline()
this_y = mine.readline()
this_room = mine.readline()
this_sprite = mine.readline()
this_frame = mine.readline()
mine.close()
message = [{"cmd": "Bounce", "tags": ["Online"],
"data": {"player": ctx.slot, "x": this_x, "y": this_y, "room": this_room,
"spr": this_sprite, "frm": this_frame}}]
await ctx.send_msgs(message)
await asyncio.sleep(0.1)
async def game_watcher(ctx: UndertaleContext):
while not ctx.exit_event.is_set():
await ctx.update_death_link(ctx.deathlink_status)
path = ctx.save_game_folder
if ctx.syncing:
for root, dirs, files in os.walk(path):
for file in files:
if ".item" in file:
os.remove(os.path.join(root, file))
sync_msg = [{"cmd": "Sync"}]
if ctx.locations_checked:
sync_msg.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)})
await ctx.send_msgs(sync_msg)
ctx.syncing = False
if ctx.got_deathlink:
ctx.got_deathlink = False
with open(os.path.join(ctx.save_game_folder, "WelcomeToTheDead.youDied"), "w") as f:
f.close()
sending = []
victory = False
found_routes = 0
for root, dirs, files in os.walk(path):
for file in files:
if "DontBeMad.mad" in file:
os.remove(os.path.join(root, file))
if "DeathLink" in ctx.tags:
await ctx.send_death()
if "scout" == file:
sending = []
try:
with open(os.path.join(root, file), "r") as f:
lines = f.readlines()
for l in lines:
if ctx.server_locations.__contains__(int(l)+12000):
sending = sending + [int(l.rstrip('\n'))+12000]
finally:
await ctx.send_msgs([{"cmd": "LocationScouts", "locations": sending,
"create_as_hint": int(2)}])
os.remove(os.path.join(root, file))
if "check.spot" in file:
sending = []
try:
with open(os.path.join(root, file), "r") as f:
lines = f.readlines()
for l in lines:
sending = sending+[(int(l.rstrip('\n')))+12000]
finally:
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": sending}])
if "victory" in file and str(ctx.route) in file:
victory = True
if ".playerspot" in file and "Online" not in ctx.tags:
os.remove(os.path.join(root, file))
if "victory" in file:
if str(ctx.route) == "all_routes":
if "neutral" in file and ctx.completed_routes["neutral"] != 1:
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone neutral",
"default": 0, "want_reply": True, "operations": [{"operation": "max",
"value": 1}]}])
elif "pacifist" in file and ctx.completed_routes["pacifist"] != 1:
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone pacifist",
"default": 0, "want_reply": True, "operations": [{"operation": "max",
"value": 1}]}])
elif "genocide" in file and ctx.completed_routes["genocide"] != 1:
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone genocide",
"default": 0, "want_reply": True, "operations": [{"operation": "max",
"value": 1}]}])
if str(ctx.route) == "all_routes":
found_routes += ctx.completed_routes["neutral"]
found_routes += ctx.completed_routes["pacifist"]
found_routes += ctx.completed_routes["genocide"]
if str(ctx.route) == "all_routes" and found_routes >= 3:
victory = True
ctx.locations_checked = sending
if (not ctx.finished_game) and victory:
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
ctx.finished_game = True
await asyncio.sleep(0.1)
def main():
Utils.init_logging("UndertaleClient", exception_logger="Client")
async def _main():
ctx = UndertaleContext(None, None)
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
asyncio.create_task(
game_watcher(ctx), name="UndertaleProgressionWatcher")
asyncio.create_task(
multi_watcher(ctx), name="UndertaleMultiplayerWatcher")
if gui_enabled:
ctx.run_gui()
ctx.run_cli()
await ctx.exit_event.wait()
await ctx.shutdown()
import colorama
colorama.init()
asyncio.run(_main())
colorama.deinit()
if __name__ == "__main__":
parser = get_base_parser(description="Undertale Client, for text interfacing.")
args = parser.parse_args()
main()