mirror of
https://github.com/MarioSpore/Grinch-AP.git
synced 2025-10-21 12:11:33 -06:00

* Update UndertaleClient.py Fixed output location of the patched game Fixed the error that when the client is opened outside of the archipelago folder, the patched folder would be created in there which on windows ends up trying to create it in the system32 folder Bug Report: https://discord.com/channels/731205301247803413/1148330675452264499/1237412436382973962 * Undertale: removed unnecessary wrapping in UndertaleClient.py I did not know os.path.join was unnecessary in this case the more you know.
513 lines
23 KiB
Python
513 lines
23 KiB
Python
from __future__ import annotations
|
|
import os
|
|
import sys
|
|
import asyncio
|
|
import typing
|
|
import bsdiff4
|
|
import shutil
|
|
|
|
import Utils
|
|
|
|
from NetUtils import NetworkItem, ClientStatus
|
|
from worlds import undertale
|
|
from MultiServer import mark_raw
|
|
from CommonClient import CommonContext, server_loop, \
|
|
gui_enabled, ClientCommandProcessor, logger, get_base_parser
|
|
from Utils import async_start
|
|
|
|
|
|
class UndertaleCommandProcessor(ClientCommandProcessor):
|
|
def __init__(self, ctx):
|
|
super().__init__(ctx)
|
|
|
|
def _cmd_resync(self):
|
|
"""Manually trigger a resync."""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
self.output(f"Syncing items.")
|
|
self.ctx.syncing = True
|
|
|
|
def _cmd_patch(self):
|
|
"""Patch the game. Only use this command if /auto_patch fails."""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
os.makedirs(name=Utils.user_path("Undertale"), exist_ok=True)
|
|
self.ctx.patch_game()
|
|
self.output("Patched.")
|
|
|
|
def _cmd_savepath(self, directory: str):
|
|
"""Redirect to proper save data folder. This is necessary for Linux users to use before connecting."""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
self.ctx.save_game_folder = directory
|
|
self.output("Changed to the following directory: " + self.ctx.save_game_folder)
|
|
|
|
@mark_raw
|
|
def _cmd_auto_patch(self, steaminstall: typing.Optional[str] = None):
|
|
"""Patch the game automatically."""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
os.makedirs(name=Utils.user_path("Undertale"), exist_ok=True)
|
|
tempInstall = steaminstall
|
|
if not os.path.isfile(os.path.join(tempInstall, "data.win")):
|
|
tempInstall = None
|
|
if tempInstall is None:
|
|
tempInstall = "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Undertale"
|
|
if not os.path.exists(tempInstall):
|
|
tempInstall = "C:\\Program Files\\Steam\\steamapps\\common\\Undertale"
|
|
elif not os.path.exists(tempInstall):
|
|
tempInstall = "C:\\Program Files (x86)\\Steam\\steamapps\\common\\Undertale"
|
|
if not os.path.exists(tempInstall):
|
|
tempInstall = "C:\\Program Files\\Steam\\steamapps\\common\\Undertale"
|
|
if not os.path.exists(tempInstall) or not os.path.exists(tempInstall) or not os.path.isfile(os.path.join(tempInstall, "data.win")):
|
|
self.output("ERROR: Cannot find Undertale. Please rerun the command with the correct folder."
|
|
" command. \"/auto_patch (Steam directory)\".")
|
|
else:
|
|
for file_name in os.listdir(tempInstall):
|
|
if file_name != "steam_api.dll":
|
|
shutil.copy(os.path.join(tempInstall, file_name),
|
|
Utils.user_path("Undertale", file_name))
|
|
self.ctx.patch_game()
|
|
self.output("Patching successful!")
|
|
|
|
def _cmd_online(self):
|
|
"""Toggles seeing other Undertale players."""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
self.ctx.update_online_mode(not ("Online" in self.ctx.tags))
|
|
if "Online" in self.ctx.tags:
|
|
self.output(f"Now online.")
|
|
else:
|
|
self.output(f"Now offline.")
|
|
|
|
def _cmd_deathlink(self):
|
|
"""Toggles deathlink"""
|
|
if isinstance(self.ctx, UndertaleContext):
|
|
self.ctx.deathlink_status = not self.ctx.deathlink_status
|
|
if self.ctx.deathlink_status:
|
|
self.output(f"Deathlink enabled.")
|
|
else:
|
|
self.output(f"Deathlink disabled.")
|
|
|
|
|
|
class UndertaleContext(CommonContext):
|
|
tags = {"AP", "Online"}
|
|
game = "Undertale"
|
|
command_processor = UndertaleCommandProcessor
|
|
items_handling = 0b111
|
|
route = None
|
|
pieces_needed = None
|
|
completed_routes = None
|
|
completed_count = 0
|
|
save_game_folder = os.path.expandvars(r"%localappdata%/UNDERTALE")
|
|
|
|
def __init__(self, server_address, password):
|
|
super().__init__(server_address, password)
|
|
self.pieces_needed = 0
|
|
self.finished_game = False
|
|
self.game = "Undertale"
|
|
self.got_deathlink = False
|
|
self.syncing = False
|
|
self.deathlink_status = False
|
|
self.tem_armor = False
|
|
self.completed_count = 0
|
|
self.completed_routes = {"pacifist": 0, "genocide": 0, "neutral": 0}
|
|
# self.save_game_folder: files go in this path to pass data between us and the actual game
|
|
self.save_game_folder = os.path.expandvars(r"%localappdata%/UNDERTALE")
|
|
|
|
def patch_game(self):
|
|
with open(Utils.user_path("Undertale", "data.win"), "rb") as f:
|
|
patchedFile = bsdiff4.patch(f.read(), undertale.data_path("patch.bsdiff"))
|
|
with open(Utils.user_path("Undertale", "data.win"), "wb") as f:
|
|
f.write(patchedFile)
|
|
os.makedirs(name=Utils.user_path("Undertale", "Custom Sprites"), exist_ok=True)
|
|
with open(os.path.expandvars(Utils.user_path("Undertale", "Custom Sprites",
|
|
"Which Character.txt")), "w") as f:
|
|
f.writelines(["// Put the folder name of the sprites you want to play as, make sure it is the only "
|
|
"line other than this one.\n", "frisk"])
|
|
f.close()
|
|
|
|
async def server_auth(self, password_requested: bool = False):
|
|
if password_requested and not self.password:
|
|
await super().server_auth(password_requested)
|
|
await self.get_username()
|
|
await self.send_connect()
|
|
|
|
def clear_undertale_files(self):
|
|
path = self.save_game_folder
|
|
self.finished_game = False
|
|
for root, dirs, files in os.walk(path):
|
|
for file in files:
|
|
if "check.spot" == file or "scout" == file:
|
|
os.remove(os.path.join(root, file))
|
|
elif file.endswith((".item", ".victory", ".route", ".playerspot", ".mad",
|
|
".youDied", ".LV", ".mine", ".flag", ".hint")):
|
|
os.remove(os.path.join(root, file))
|
|
|
|
async def connect(self, address: typing.Optional[str] = None):
|
|
self.clear_undertale_files()
|
|
await super().connect(address)
|
|
|
|
async def disconnect(self, allow_autoreconnect: bool = False):
|
|
self.clear_undertale_files()
|
|
await super().disconnect(allow_autoreconnect)
|
|
|
|
async def connection_closed(self):
|
|
self.clear_undertale_files()
|
|
await super().connection_closed()
|
|
|
|
async def shutdown(self):
|
|
self.clear_undertale_files()
|
|
await super().shutdown()
|
|
|
|
def update_online_mode(self, online):
|
|
old_tags = self.tags.copy()
|
|
if online:
|
|
self.tags.add("Online")
|
|
else:
|
|
self.tags -= {"Online"}
|
|
if old_tags != self.tags and self.server and not self.server.socket.closed:
|
|
async_start(self.send_msgs([{"cmd": "ConnectUpdate", "tags": self.tags}]))
|
|
|
|
def on_package(self, cmd: str, args: dict):
|
|
if cmd == "Connected":
|
|
self.game = self.slot_info[self.slot].game
|
|
async_start(process_undertale_cmd(self, cmd, args))
|
|
|
|
def run_gui(self):
|
|
from kvui import GameManager
|
|
|
|
class UTManager(GameManager):
|
|
logging_pairs = [
|
|
("Client", "Archipelago")
|
|
]
|
|
base_title = "Archipelago Undertale Client"
|
|
|
|
self.ui = UTManager(self)
|
|
self.ui_task = asyncio.create_task(self.ui.async_run(), name="UI")
|
|
|
|
def on_deathlink(self, data: typing.Dict[str, typing.Any]):
|
|
self.got_deathlink = True
|
|
super().on_deathlink(data)
|
|
|
|
|
|
def to_room_name(place_name: str):
|
|
if place_name == "Old Home Exit":
|
|
return "room_ruinsexit"
|
|
elif place_name == "Snowdin Forest":
|
|
return "room_tundra1"
|
|
elif place_name == "Snowdin Town Exit":
|
|
return "room_fogroom"
|
|
elif place_name == "Waterfall":
|
|
return "room_water1"
|
|
elif place_name == "Waterfall Exit":
|
|
return "room_fire2"
|
|
elif place_name == "Hotland":
|
|
return "room_fire_prelab"
|
|
elif place_name == "Hotland Exit":
|
|
return "room_fire_precore"
|
|
elif place_name == "Core":
|
|
return "room_fire_core1"
|
|
|
|
|
|
async def process_undertale_cmd(ctx: UndertaleContext, cmd: str, args: dict):
|
|
if cmd == "Connected":
|
|
if not os.path.exists(ctx.save_game_folder):
|
|
os.mkdir(ctx.save_game_folder)
|
|
ctx.route = args["slot_data"]["route"]
|
|
ctx.pieces_needed = args["slot_data"]["key_pieces"]
|
|
ctx.tem_armor = args["slot_data"]["temy_armor_include"]
|
|
|
|
await ctx.send_msgs([{"cmd": "Get", "keys": [str(ctx.slot)+" RoutesDone neutral",
|
|
str(ctx.slot)+" RoutesDone pacifist",
|
|
str(ctx.slot)+" RoutesDone genocide"]}])
|
|
await ctx.send_msgs([{"cmd": "SetNotify", "keys": [str(ctx.slot)+" RoutesDone neutral",
|
|
str(ctx.slot)+" RoutesDone pacifist",
|
|
str(ctx.slot)+" RoutesDone genocide"]}])
|
|
if args["slot_data"]["only_flakes"]:
|
|
with open(os.path.join(ctx.save_game_folder, "GenoNoChest.flag"), "w") as f:
|
|
f.close()
|
|
if not args["slot_data"]["key_hunt"]:
|
|
ctx.pieces_needed = 0
|
|
if args["slot_data"]["rando_love"]:
|
|
filename = f"LOVErando.LV"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.close()
|
|
if args["slot_data"]["rando_stats"]:
|
|
filename = f"STATrando.LV"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.close()
|
|
filename = f"{ctx.route}.route"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.close()
|
|
filename = f"check.spot"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "a") as f:
|
|
for ss in set(args["checked_locations"]):
|
|
f.write(str(ss-12000)+"\n")
|
|
f.close()
|
|
elif cmd == "LocationInfo":
|
|
for l in args["locations"]:
|
|
locationid = l.location
|
|
filename = f"{str(locationid-12000)}.hint"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
toDraw = ""
|
|
for i in range(20):
|
|
if i < len(str(ctx.item_names.lookup_in_game(l.item))):
|
|
toDraw += str(ctx.item_names.lookup_in_game(l.item))[i]
|
|
else:
|
|
break
|
|
f.write(toDraw)
|
|
f.close()
|
|
elif cmd == "Retrieved":
|
|
if str(ctx.slot)+" RoutesDone neutral" in args["keys"]:
|
|
if args["keys"][str(ctx.slot)+" RoutesDone neutral"] is not None:
|
|
ctx.completed_routes["neutral"] = args["keys"][str(ctx.slot)+" RoutesDone neutral"]
|
|
if str(ctx.slot)+" RoutesDone genocide" in args["keys"]:
|
|
if args["keys"][str(ctx.slot)+" RoutesDone genocide"] is not None:
|
|
ctx.completed_routes["genocide"] = args["keys"][str(ctx.slot)+" RoutesDone genocide"]
|
|
if str(ctx.slot)+" RoutesDone pacifist" in args["keys"]:
|
|
if args["keys"][str(ctx.slot) + " RoutesDone pacifist"] is not None:
|
|
ctx.completed_routes["pacifist"] = args["keys"][str(ctx.slot)+" RoutesDone pacifist"]
|
|
elif cmd == "SetReply":
|
|
if args["value"] is not None:
|
|
if str(ctx.slot)+" RoutesDone pacifist" == args["key"]:
|
|
ctx.completed_routes["pacifist"] = args["value"]
|
|
elif str(ctx.slot)+" RoutesDone genocide" == args["key"]:
|
|
ctx.completed_routes["genocide"] = args["value"]
|
|
elif str(ctx.slot)+" RoutesDone neutral" == args["key"]:
|
|
ctx.completed_routes["neutral"] = args["value"]
|
|
elif cmd == "ReceivedItems":
|
|
start_index = args["index"]
|
|
|
|
if start_index == 0:
|
|
ctx.items_received = []
|
|
elif start_index != len(ctx.items_received):
|
|
sync_msg = [{"cmd": "Sync"}]
|
|
if ctx.locations_checked:
|
|
sync_msg.append({"cmd": "LocationChecks",
|
|
"locations": list(ctx.locations_checked)})
|
|
await ctx.send_msgs(sync_msg)
|
|
if start_index == len(ctx.items_received):
|
|
counter = -1
|
|
placedWeapon = 0
|
|
placedArmor = 0
|
|
for item in args["items"]:
|
|
id = NetworkItem(*item).location
|
|
while NetworkItem(*item).location < 0 and \
|
|
counter <= id:
|
|
id -= 1
|
|
if NetworkItem(*item).location < 0:
|
|
counter -= 1
|
|
filename = f"{str(id)}PLR{str(NetworkItem(*item).player)}.item"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
if NetworkItem(*item).item == 77701:
|
|
if placedWeapon == 0:
|
|
f.write(str(77013-11000))
|
|
elif placedWeapon == 1:
|
|
f.write(str(77014-11000))
|
|
elif placedWeapon == 2:
|
|
f.write(str(77025-11000))
|
|
elif placedWeapon == 3:
|
|
f.write(str(77045-11000))
|
|
elif placedWeapon == 4:
|
|
f.write(str(77049-11000))
|
|
elif placedWeapon == 5:
|
|
f.write(str(77047-11000))
|
|
elif placedWeapon == 6:
|
|
if str(ctx.route) == "genocide" or str(ctx.route) == "all_routes":
|
|
f.write(str(77052-11000))
|
|
else:
|
|
f.write(str(77051-11000))
|
|
else:
|
|
f.write(str(77003-11000))
|
|
placedWeapon += 1
|
|
elif NetworkItem(*item).item == 77702:
|
|
if placedArmor == 0:
|
|
f.write(str(77012-11000))
|
|
elif placedArmor == 1:
|
|
f.write(str(77015-11000))
|
|
elif placedArmor == 2:
|
|
f.write(str(77024-11000))
|
|
elif placedArmor == 3:
|
|
f.write(str(77044-11000))
|
|
elif placedArmor == 4:
|
|
f.write(str(77048-11000))
|
|
elif placedArmor == 5:
|
|
if str(ctx.route) == "genocide":
|
|
f.write(str(77053-11000))
|
|
else:
|
|
f.write(str(77046-11000))
|
|
elif placedArmor == 6 and ((not str(ctx.route) == "genocide") or ctx.tem_armor):
|
|
if str(ctx.route) == "all_routes":
|
|
f.write(str(77053-11000))
|
|
elif str(ctx.route) == "genocide":
|
|
f.write(str(77064-11000))
|
|
else:
|
|
f.write(str(77050-11000))
|
|
elif placedArmor == 7 and ctx.tem_armor and not str(ctx.route) == "genocide":
|
|
f.write(str(77064-11000))
|
|
else:
|
|
f.write(str(77004-11000))
|
|
placedArmor += 1
|
|
else:
|
|
f.write(str(NetworkItem(*item).item-11000))
|
|
f.close()
|
|
ctx.items_received.append(NetworkItem(*item))
|
|
if [item.item for item in ctx.items_received].count(77000) >= ctx.pieces_needed > 0:
|
|
filename = f"{str(-99999)}PLR{str(0)}.item"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.write(str(77787 - 11000))
|
|
f.close()
|
|
filename = f"{str(-99998)}PLR{str(0)}.item"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.write(str(77789 - 11000))
|
|
f.close()
|
|
ctx.watcher_event.set()
|
|
|
|
elif cmd == "RoomUpdate":
|
|
if "checked_locations" in args:
|
|
filename = f"check.spot"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "a") as f:
|
|
for ss in set(args["checked_locations"]):
|
|
f.write(str(ss-12000)+"\n")
|
|
f.close()
|
|
|
|
elif cmd == "Bounced":
|
|
tags = args.get("tags", [])
|
|
if "Online" in tags:
|
|
data = args.get("data", {})
|
|
if data["player"] != ctx.slot and data["player"] is not None:
|
|
filename = f"FRISK" + str(data["player"]) + ".playerspot"
|
|
with open(os.path.join(ctx.save_game_folder, filename), "w") as f:
|
|
f.write(str(data["x"]) + str(data["y"]) + str(data["room"]) + str(
|
|
data["spr"]) + str(data["frm"]))
|
|
f.close()
|
|
|
|
|
|
async def multi_watcher(ctx: UndertaleContext):
|
|
while not ctx.exit_event.is_set():
|
|
path = ctx.save_game_folder
|
|
for root, dirs, files in os.walk(path):
|
|
for file in files:
|
|
if "spots.mine" in file and "Online" in ctx.tags:
|
|
with open(os.path.join(root, file), "r") as mine:
|
|
this_x = mine.readline()
|
|
this_y = mine.readline()
|
|
this_room = mine.readline()
|
|
this_sprite = mine.readline()
|
|
this_frame = mine.readline()
|
|
mine.close()
|
|
message = [{"cmd": "Bounce", "tags": ["Online"],
|
|
"data": {"player": ctx.slot, "x": this_x, "y": this_y, "room": this_room,
|
|
"spr": this_sprite, "frm": this_frame}}]
|
|
await ctx.send_msgs(message)
|
|
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
async def game_watcher(ctx: UndertaleContext):
|
|
while not ctx.exit_event.is_set():
|
|
await ctx.update_death_link(ctx.deathlink_status)
|
|
path = ctx.save_game_folder
|
|
if ctx.syncing:
|
|
for root, dirs, files in os.walk(path):
|
|
for file in files:
|
|
if ".item" in file:
|
|
os.remove(os.path.join(root, file))
|
|
sync_msg = [{"cmd": "Sync"}]
|
|
if ctx.locations_checked:
|
|
sync_msg.append({"cmd": "LocationChecks", "locations": list(ctx.locations_checked)})
|
|
await ctx.send_msgs(sync_msg)
|
|
ctx.syncing = False
|
|
if ctx.got_deathlink:
|
|
ctx.got_deathlink = False
|
|
with open(os.path.join(ctx.save_game_folder, "WelcomeToTheDead.youDied"), "w") as f:
|
|
f.close()
|
|
sending = []
|
|
victory = False
|
|
found_routes = 0
|
|
for root, dirs, files in os.walk(path):
|
|
for file in files:
|
|
if "DontBeMad.mad" in file:
|
|
os.remove(os.path.join(root, file))
|
|
if "DeathLink" in ctx.tags:
|
|
await ctx.send_death()
|
|
if "scout" == file:
|
|
sending = []
|
|
try:
|
|
with open(os.path.join(root, file), "r") as f:
|
|
lines = f.readlines()
|
|
for l in lines:
|
|
if ctx.server_locations.__contains__(int(l)+12000):
|
|
sending = sending + [int(l.rstrip('\n'))+12000]
|
|
finally:
|
|
await ctx.send_msgs([{"cmd": "LocationScouts", "locations": sending,
|
|
"create_as_hint": int(2)}])
|
|
os.remove(os.path.join(root, file))
|
|
if "check.spot" in file:
|
|
sending = []
|
|
try:
|
|
with open(os.path.join(root, file), "r") as f:
|
|
lines = f.readlines()
|
|
for l in lines:
|
|
sending = sending+[(int(l.rstrip('\n')))+12000]
|
|
finally:
|
|
await ctx.send_msgs([{"cmd": "LocationChecks", "locations": sending}])
|
|
if "victory" in file and str(ctx.route) in file:
|
|
victory = True
|
|
if ".playerspot" in file and "Online" not in ctx.tags:
|
|
os.remove(os.path.join(root, file))
|
|
if "victory" in file:
|
|
if str(ctx.route) == "all_routes":
|
|
if "neutral" in file and ctx.completed_routes["neutral"] != 1:
|
|
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone neutral",
|
|
"default": 0, "want_reply": True, "operations": [{"operation": "max",
|
|
"value": 1}]}])
|
|
elif "pacifist" in file and ctx.completed_routes["pacifist"] != 1:
|
|
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone pacifist",
|
|
"default": 0, "want_reply": True, "operations": [{"operation": "max",
|
|
"value": 1}]}])
|
|
elif "genocide" in file and ctx.completed_routes["genocide"] != 1:
|
|
await ctx.send_msgs([{"cmd": "Set", "key": str(ctx.slot)+" RoutesDone genocide",
|
|
"default": 0, "want_reply": True, "operations": [{"operation": "max",
|
|
"value": 1}]}])
|
|
if str(ctx.route) == "all_routes":
|
|
found_routes += ctx.completed_routes["neutral"]
|
|
found_routes += ctx.completed_routes["pacifist"]
|
|
found_routes += ctx.completed_routes["genocide"]
|
|
if str(ctx.route) == "all_routes" and found_routes >= 3:
|
|
victory = True
|
|
ctx.locations_checked = sending
|
|
if (not ctx.finished_game) and victory:
|
|
await ctx.send_msgs([{"cmd": "StatusUpdate", "status": ClientStatus.CLIENT_GOAL}])
|
|
ctx.finished_game = True
|
|
await asyncio.sleep(0.1)
|
|
|
|
|
|
def main():
|
|
Utils.init_logging("UndertaleClient", exception_logger="Client")
|
|
|
|
async def _main():
|
|
ctx = UndertaleContext(None, None)
|
|
ctx.server_task = asyncio.create_task(server_loop(ctx), name="server loop")
|
|
asyncio.create_task(
|
|
game_watcher(ctx), name="UndertaleProgressionWatcher")
|
|
|
|
asyncio.create_task(
|
|
multi_watcher(ctx), name="UndertaleMultiplayerWatcher")
|
|
|
|
if gui_enabled:
|
|
ctx.run_gui()
|
|
ctx.run_cli()
|
|
|
|
await ctx.exit_event.wait()
|
|
await ctx.shutdown()
|
|
|
|
import colorama
|
|
|
|
colorama.init()
|
|
|
|
asyncio.run(_main())
|
|
colorama.deinit()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = get_base_parser(description="Undertale Client, for text interfacing.")
|
|
args = parser.parse_args()
|
|
main()
|