304 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			304 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import json
 | |
| import os
 | |
| import platform
 | |
| import struct
 | |
| import subprocess
 | |
| import copy
 | |
| import threading
 | |
| from .Utils import subprocess_args, data_path, get_version_bytes, __version__
 | |
| from Utils import user_path
 | |
| from .ntype import BigStream
 | |
| from .crc import calculate_crc
 | |
| 
 | |
| DMADATA_START = 0x7430
 | |
| 
 | |
| double_cache_prevention = threading.Lock()
 | |
| 
 | |
| class Rom(BigStream):
 | |
|     original = None
 | |
| 
 | |
|     def __init__(self, file=None, force_use=False):
 | |
|         super().__init__([])
 | |
| 
 | |
|         self.changed_address = {}
 | |
|         self.changed_dma = {}
 | |
|         self.force_patch = []
 | |
| 
 | |
|         if file is None:
 | |
|             return
 | |
| 
 | |
|         decomp_file = user_path('ZOOTDEC.z64')
 | |
| 
 | |
|         with open(data_path('generated/symbols.json'), 'r') as stream:
 | |
|             symbols = json.load(stream)
 | |
|             self.symbols = {name: int(addr, 16) for name, addr in symbols.items()}
 | |
| 
 | |
|         # If decompressed file already exists, read from it
 | |
|         if not force_use:
 | |
|             if os.path.exists(decomp_file):
 | |
|                 file = decomp_file
 | |
| 
 | |
|             if file == '':
 | |
|                 # if not specified, try to read from the previously decompressed rom
 | |
|                 file = decomp_file
 | |
|                 try:
 | |
|                     self.read_rom(file)
 | |
|                 except FileNotFoundError:
 | |
|                     # could not find the decompressed rom either
 | |
|                     raise FileNotFoundError('Must specify path to base ROM')
 | |
|             else:
 | |
|                 self.read_rom(file)
 | |
|         else:
 | |
|             self.read_rom(file)
 | |
| 
 | |
|         # decompress rom, or check if it's already decompressed
 | |
|         self.decompress_rom_file(file, decomp_file, force_use)
 | |
| 
 | |
|         # Add file to maximum size
 | |
|         self.buffer.extend(bytearray([0x00] * (0x4000000 - len(self.buffer))))
 | |
|         with double_cache_prevention:
 | |
|             if not self.original:
 | |
|                 Rom.original = self.copy()
 | |
| 
 | |
|         # Add version number to header.
 | |
|         self.write_bytes(0x35, get_version_bytes(__version__))
 | |
|         self.force_patch.extend([0x35, 0x36, 0x37])
 | |
| 
 | |
|     def copy(self):
 | |
|         new_rom = Rom()
 | |
|         new_rom.buffer = copy.copy(self.buffer)
 | |
|         new_rom.changed_address = copy.copy(self.changed_address)
 | |
|         new_rom.changed_dma = copy.copy(self.changed_dma)
 | |
|         new_rom.force_patch = copy.copy(self.force_patch)
 | |
|         return new_rom
 | |
| 
 | |
|     def decompress_rom_file(self, file, decomp_file, skip_crc_check):
 | |
|         validCRC = [
 | |
|             [0xEC, 0x70, 0x11, 0xB7, 0x76, 0x16, 0xD7, 0x2B],  # Compressed
 | |
|             [0x70, 0xEC, 0xB7, 0x11, 0x16, 0x76, 0x2B, 0xD7],  # Byteswap compressed
 | |
|             [0x93, 0x52, 0x2E, 0x7B, 0xE5, 0x06, 0xD4, 0x27],  # Decompressed
 | |
|         ]
 | |
| 
 | |
|         # Validate ROM file
 | |
|         file_name = os.path.splitext(file)
 | |
|         romCRC = list(self.buffer[0x10:0x18])
 | |
|         if romCRC not in validCRC and not skip_crc_check:
 | |
|             # Bad CRC validation
 | |
|             raise RuntimeError('ROM file %s is not a valid OoT 1.0 US ROM.' % file)
 | |
|         elif len(self.buffer) < 0x2000000 or len(self.buffer) > (0x4000000) or file_name[1].lower() not in ['.z64',
 | |
|                                                                                                             '.n64']:
 | |
|             # ROM is too big, or too small, or not a bad type
 | |
|             raise RuntimeError('ROM file %s is not a valid OoT 1.0 US ROM.' % file)
 | |
|         elif len(self.buffer) == 0x2000000:
 | |
|             # If Input ROM is compressed, then Decompress it
 | |
| 
 | |
|             sub_dir = data_path("Decompress")
 | |
| 
 | |
|             if platform.system() == 'Windows':
 | |
|                 subcall = [sub_dir + "\\Decompress.exe", file, decomp_file]
 | |
|             elif platform.system() == 'Linux':
 | |
|                 if platform.uname()[4] == 'aarch64' or platform.uname()[4] == 'arm64':
 | |
|                     subcall = [sub_dir + "/Decompress_ARM64", file, decomp_file]
 | |
|                 else:
 | |
|                     subcall = [sub_dir + "/Decompress", file, decomp_file]
 | |
|             elif platform.system() == 'Darwin':
 | |
|                 subcall = [sub_dir + "/Decompress.out", file, decomp_file]
 | |
|             else:
 | |
|                 raise RuntimeError(
 | |
|                     'Unsupported operating system for decompression. Please supply an already decompressed ROM.')
 | |
| 
 | |
|             if not os.path.exists(subcall[0]):
 | |
|                 raise RuntimeError(f'Decompressor does not exist! Please place it at {subcall[0]}.')
 | |
|             subprocess.call(subcall, **subprocess_args())
 | |
|             self.read_rom(decomp_file)
 | |
|         else:
 | |
|             # ROM file is a valid and already uncompressed
 | |
|             pass
 | |
| 
 | |
|     def write_byte(self, address, value):
 | |
|         super().write_byte(address, value)
 | |
|         self.changed_address[self.last_address - 1] = value
 | |
| 
 | |
|     def write_bytes(self, address, values):
 | |
|         super().write_bytes(address, values)
 | |
|         self.changed_address.update(zip(range(address, address + len(values)), values))
 | |
| 
 | |
|     def restore(self):
 | |
|         self.buffer = copy.copy(self.original.buffer)
 | |
|         self.changed_address = {}
 | |
|         self.changed_dma = {}
 | |
|         self.force_patch = []
 | |
|         self.last_address = None
 | |
|         self.write_bytes(0x35, get_version_bytes(__version__))
 | |
|         self.force_patch.extend([0x35, 0x36, 0x37])
 | |
| 
 | |
|     def sym(self, symbol_name):
 | |
|         return self.symbols.get(symbol_name)
 | |
| 
 | |
|     def write_to_file(self, file):
 | |
|         self.verify_dmadata()
 | |
|         self.update_header()
 | |
|         with open(file, 'wb') as outfile:
 | |
|             outfile.write(self.buffer)
 | |
| 
 | |
|     def update_header(self):
 | |
|         crc = calculate_crc(self)
 | |
|         self.write_bytes(0x10, crc)
 | |
| 
 | |
|     def read_rom(self, file):
 | |
|         # "Reads rom into bytearray"
 | |
|         try:
 | |
|             with open(file, 'rb') as stream:
 | |
|                 self.buffer = bytearray(stream.read())
 | |
|         except FileNotFoundError as ex:
 | |
|             raise FileNotFoundError('Invalid path to Base ROM: "' + file + '"')
 | |
| 
 | |
|     # dmadata/file management helper functions
 | |
| 
 | |
|     def _get_dmadata_record(self, cur):
 | |
|         start = self.read_int32(cur)
 | |
|         end = self.read_int32(cur + 0x04)
 | |
|         size = end - start
 | |
|         return start, end, size
 | |
| 
 | |
|     def get_dmadata_record_by_key(self, key):
 | |
|         cur = DMADATA_START
 | |
|         dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
|         while True:
 | |
|             if dma_start == 0 and dma_end == 0:
 | |
|                 return None
 | |
|             if dma_start == key:
 | |
|                 return dma_start, dma_end, dma_size
 | |
|             cur += 0x10
 | |
|             dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
| 
 | |
|     def verify_dmadata(self):
 | |
|         cur = DMADATA_START
 | |
|         overlapping_records = []
 | |
|         dma_data = []
 | |
| 
 | |
|         while True:
 | |
|             this_start, this_end, this_size = self._get_dmadata_record(cur)
 | |
| 
 | |
|             if this_start == 0 and this_end == 0:
 | |
|                 break
 | |
| 
 | |
|             dma_data.append((this_start, this_end, this_size))
 | |
|             cur += 0x10
 | |
| 
 | |
|         dma_data.sort(key=lambda v: v[0])
 | |
| 
 | |
|         for i in range(0, len(dma_data) - 1):
 | |
|             this_start, this_end, this_size = dma_data[i]
 | |
|             next_start, next_end, next_size = dma_data[i + 1]
 | |
| 
 | |
|             if this_end > next_start:
 | |
|                 overlapping_records.append(
 | |
|                     '0x%08X - 0x%08X (Size: 0x%04X)\n0x%08X - 0x%08X (Size: 0x%04X)' % \
 | |
|                     (this_start, this_end, this_size, next_start, next_end, next_size)
 | |
|                 )
 | |
| 
 | |
|         if len(overlapping_records) > 0:
 | |
|             raise Exception("Overlapping DMA Data Records!\n%s" % \
 | |
|                             '\n-------------------------------------\n'.join(overlapping_records))
 | |
| 
 | |
|     # update dmadata record with start vrom address "key"
 | |
|     # if key is not found, then attempt to add a new dmadata entry
 | |
|     def update_dmadata_record(self, key, start, end, from_file=None):
 | |
|         cur, dma_data_end = self.get_dma_table_range()
 | |
|         dma_index = 0
 | |
|         dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
|         while dma_start != key:
 | |
|             if dma_start == 0 and dma_end == 0:
 | |
|                 break
 | |
| 
 | |
|             cur += 0x10
 | |
|             dma_index += 1
 | |
|             dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
| 
 | |
|         if cur >= (dma_data_end - 0x10):
 | |
|             raise Exception('dmadata update failed: key {0:x} not found in dmadata and dma table is full.'.format(key))
 | |
|         else:
 | |
|             self.write_int32s(cur, [start, end, start, 0])
 | |
|             if from_file == None:
 | |
|                 if key == None:
 | |
|                     from_file = -1
 | |
|                 else:
 | |
|                     from_file = key
 | |
|             self.changed_dma[dma_index] = (from_file, start, end - start)
 | |
| 
 | |
|     def get_dma_table_range(self):
 | |
|         cur = DMADATA_START
 | |
|         dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
|         while True:
 | |
|             if dma_start == 0 and dma_end == 0:
 | |
|                 raise Exception('Bad DMA Table: DMA Table entry missing.')
 | |
| 
 | |
|             if dma_start == DMADATA_START:
 | |
|                 return (DMADATA_START, dma_end)
 | |
| 
 | |
|             cur += 0x10
 | |
|             dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
| 
 | |
|     # This will scan for any changes that have been made to the DMA table
 | |
|     # This assumes any changes here are new files, so this should only be called
 | |
|     # after patching in the new files, but before vanilla files are repointed
 | |
|     def scan_dmadata_update(self):
 | |
|         cur = DMADATA_START
 | |
|         dma_data_end = None
 | |
|         dma_index = 0
 | |
|         dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
|         old_dma_start, old_dma_end, old_dma_size = self.original._get_dmadata_record(cur)
 | |
| 
 | |
|         while True:
 | |
|             if (dma_start == 0 and dma_end == 0) and \
 | |
|                     (old_dma_start == 0 and old_dma_end == 0):
 | |
|                 break
 | |
| 
 | |
|             # If the entries do not match, the flag the changed entry
 | |
|             if not (dma_start == old_dma_start and dma_end == old_dma_end):
 | |
|                 self.changed_dma[dma_index] = (-1, dma_start, dma_end - dma_start)
 | |
| 
 | |
|             cur += 0x10
 | |
|             dma_index += 1
 | |
|             dma_start, dma_end, dma_size = self._get_dmadata_record(cur)
 | |
|             old_dma_start, old_dma_end, old_dma_size = self.original._get_dmadata_record(cur)
 | |
| 
 | |
|     # gets the last used byte of rom defined in the DMA table
 | |
|     def free_space(self):
 | |
|         cur = DMADATA_START
 | |
|         max_end = 0
 | |
| 
 | |
|         while True:
 | |
|             this_start, this_end, this_size = self._get_dmadata_record(cur)
 | |
| 
 | |
|             if this_start == 0 and this_end == 0:
 | |
|                 break
 | |
| 
 | |
|             max_end = max(max_end, this_end)
 | |
|             cur += 0x10
 | |
|         max_end = ((max_end + 0x0F) >> 4) << 4
 | |
|         return max_end
 | |
| 
 | |
| 
 | |
| def compress_rom_file(input_file, output_file):
 | |
|     compressor_path = "."
 | |
| 
 | |
|     if platform.system() == 'Windows':
 | |
|         executable_path = "Compress.exe"
 | |
|     elif platform.system() == 'Linux':
 | |
|         if platform.uname()[4] == 'aarch64' or platform.uname()[4] == 'arm64':
 | |
|             executable_path = "Compress_ARM64"
 | |
|         else:
 | |
|             executable_path = "Compress"
 | |
|     elif platform.system() == 'Darwin':
 | |
|         executable_path = "Compress.out"
 | |
|     else:
 | |
|         raise RuntimeError('Unsupported operating system for compression.')
 | |
|     compressor_path = os.path.join(compressor_path, executable_path)
 | |
|     if not os.path.exists(compressor_path):
 | |
|         raise RuntimeError(f'Compressor does not exist! Please place it at {compressor_path}.')
 | |
|     import logging
 | |
|     logging.info(subprocess.check_output([compressor_path, input_file, output_file],
 | |
|                                              **subprocess_args(include_stdout=False)))
 | 
