Files
python-bzz/main.py
Yavin7 08bb78e3f9 Fixed reference error for copy_index
Identified Num24+1/2 issue from documentation, and added handling for it
Fixed references to temp stuffs
2025-09-23 23:13:51 -06:00

333 lines
15 KiB
Python

import os
from bitarray import bitarray
from pathlib import Path
class BZZCompressor:
def decompress(
self, input_file_path, input_file, output_file=False, output_folder_path="out/"
) -> bytes:
data = bytes()
return_files = []
# read the input file
try:
with open(f"{input_file_path}/{input_file}", "rb") as infile:
temp = bitarray(endian="little")
temp.fromfile(infile)
data = temp.tobytes()
except IOError:
raise IOError(f"Could not open {input_file_path}/{input_file}...")
##############################################################################
#
# Reading the Headers from the file.
#
# This includes the version, some garbage 0s, the number of files, and the
# file list (probably), and finally a Checksum
#
##############################################################################
# This is always 1, 0, 0, 0. so I'm just having fun
bzz_version = int.from_bytes(data[0:4], "little")
game_id = int.from_bytes(data[4:8], "little")
num_files = int.from_bytes(data[8:12], "little")
# print(f"BZZ Version: {bzz_version}")
# print(f"Game ID: {game_id}")
# print(f"Number of Files: {num_files}")
files = []
for i in range(num_files):
tmp = (i) * 12
files.append(
{
"type": hex(
int.from_bytes(data[12 + tmp : 12 + tmp + 4], "little")
),
"file_end": hex(
int.from_bytes(data[12 + tmp + 4 : 12 + tmp + 8], "little")
),
"padding_end": hex(
int.from_bytes(data[12 + tmp + 8 : 12 + tmp + 12], "little")
),
}
)
checksum = data[0x7FC:0x800]
# print(f"Checksum: {checksum}")
##############################################################################
#
# This is the File Loop, where we process
# individual files from the .bzz
#
##############################################################################
starting_index = 0x800
# File Loop
for file_i, file in enumerate(files):
file_num = file_i + 1
index = starting_index
output_buffer = bytearray()
overflow_buffer = bytearray()
# Prepping for the next loop
file_end = starting_index + int(file.get("file_end")[2:], 16)
# print(hex(file_end))
# Getting our method, this is likely imprecise, since I'm one dealing with one
# method type, but it gets what I want
method = data[index]
# We move on to the next byte in data
index = index + 1
# Gathering variables based on the method according to
# https://problemkaputt.de/psxspx-cdrom-file-compression-bzz.htm
# Note: bin(int)[2:].zfill(8) converts a number to an 8-bit binary string
# `>> 3` is the same as dividing by 8
shifter = (method >> 3) & 0x03
len_bits = (method & 0x07) ^ 0x07
# The bin() function only returns the second half of the byte, so we pad the byte
len_mask = (1 << len_bits) - 1
threshold = len_mask >> 1
if threshold > 0x07:
threshold = 0x13
len_table = []
for i in range(len_mask):
if i > threshold:
len_table.append((i - threshold << shifter) + threshold + 3)
else:
len_table.append(i + 3)
num_flags = int.from_bytes(data[index : index + 3], "big") + 1
index = index + 3
# print(f"Method: {hex(method)}")
# print(f"Shifter: {shifter}")
# print(f"Len Bits: {hex(len_bits)}")
# print(f"Len Mask: {hex(len_mask)}")
# print(f"Threshold: {hex(threshold)}")
# print(f"Len Table: {len_table}")
# print(f"Loops (based on num flags): {num_flags}")
# Adding 0x100 here means the bitarray is a length of 9, and the first item is always 1
# This means that later, when we need to gather more flag bits, we aren't losing any data, or
# hitting an index out of bounds error
try:
flag_bits = data[index] + 0x100
except Exception as e:
raise IndexError(
f"Error decompressing {input_file_path}/{input_file} on {file_num}/{len(files)}. flag_bits Index was out of range.\n"
+ f"Buffer Length: {len(output_buffer)}. File Start: {hex(starting_index)}. File End: {hex(file_end)}. "
+ f"\nLoops left: {num_flags}"
+ f"\nRemaining Data: {data[index-1:index+15]}"
)
index = index + 1
try:
while num_flags > 0:
carry = bin(flag_bits)[-1]
flag_bits = flag_bits >> 1
# if we are down to only 0 bits, we are out of file-driven data
# Here we collect more flag bits and re-iterate the loop
try:
if flag_bits == 0:
flag_bits = data[index] + 0x100
index = index + 1
continue
except Exception as e:
raise IndexError(
f"Error decompressing {input_file_path}/{input_file} on {file_num}/{len(files)}. flag_bits Index was out of range.\n"
+ f"Buffer Length: {len(output_buffer)}. File Start: {hex(starting_index)}. File End: {hex(file_end)}. "
+ f"\nLoops left: {num_flags}"
+ f"\nRemaining Data: {data[index-1:index+15]}"
)
# Carry means the next byte is raw data, no weird placement or indexing
if carry == "1":
try:
output_buffer.append(data[index])
index = index + 1
except IndexError:
raise IndexError(
f"Error processing {input_file_path}/{input_file} on {file_num}/{len(files)} (Carry Path). Reached of data stream early. Index: {index}\n"
+ f"Index: {hex(index)}. File Start: {hex(starting_index)}. File End: {hex(file_end)}"
+ f"\nLoops left: {num_flags}"
)
# If Carry is 0, then we are doing actual decompression. This is the tricky part
else: # This shouldn't happen
if file_end <= index + 1:
if num_flags == 1:
break
else:
raise IndexError(
f"Error processing {input_file_path}/{input_file} on {file_num}/{len(files)} (Non-Carry Path). Reached of data stream early.\n"
+ f"Index: {hex(copy_index)}. File Start: {hex(starting_index)}. File End: {hex(file_end)}"
+ f"\nLoops left: {num_flags}"
)
# The "temp" in our documentation
distance_data = int.from_bytes(data[index : index + 2], "big")
index = index + 2
# length here is the length of the data we are copying.
# We multiply by 8 since we are working with bits instead of bytes
num_25_check = False
try:
length = len_table[(distance_data & len_mask)]
except Exception as e:
if (distance_data & len_mask) == 15:
num_25_check = True
length = 26
else:
raise IndexError(
f"Distant Data ({bin(distance_data)}) & Length Mask ({bin(len_mask)}) (which is {distance_data & len_mask}) is outside of the Length Table index range."
+ f"\nLoops left: {num_flags}."
+ f"\nRemaining Data: {data[index-1:index+15]}"
)
# Displacement is how far back in the existing output_buffer we are
# looking to copy from. We multiply by 8 since we are working with bits and not bytes
displacement = distance_data >> len_bits
# This shouldn't happen
if displacement <= 0:
if num_25_check:
index = index - 2
continue
else:
raise ValueError(
f"Error processing {input_file_path}/{input_file} on {file_num}/{len(files)}. Displacement was less than or equal to 0.\n"
+ f"Displacement: {displacement}. Distance Data: {distance_data}. Length Bits: {len_bits}. Length: {length}."
)
# Here we copy bit by bit from earlier in the output buffer.
# we use this instead of index slicing since the slice could lead to
# data we are currently copying into the buffer
copy_index = len(output_buffer) - displacement + 1
# print(f"Output Buffer Size {len(output_buffer)}")
# print(f"Distance Data: {distance_data}")
# print(f"Displacement: {displacement}")
# print(f"Length: {length}")
# print(f"Copy Index: {copy_index}")
# If start index is less than 0, we'll be checking something like output_buffer[-2]
# or smth, which will have an IndexOutOfBounds exception
if copy_index < 0:
if num_25_check:
index = index - 2
continue
else:
raise IndexError(
f"Error decompressing {input_file_path}/{input_file} on {file_num}/{len(files)}. Displacement Index was out of range.\n"
+ f"Copy Index: {hex(copy_index)}. Displacement: {displacement}. Buffer Length: {hex(len(output_buffer))}. "
+ f"Distance Data: {distance_data} {bin(distance_data)}. File Start: {hex(starting_index)}. File End: {hex(file_end)}. "
+ f"\nLoops left: {num_flags}. Index: {hex(index)}"
+ f"\nRemaining Data: {data[index-1:index+15]}"
)
try:
for i in range(length):
output_buffer.append(output_buffer[copy_index + i - 1])
except Exception as e:
if num_25_check:
index = index - 2
continue
else:
raise IndexError(
f"Error decompressing {input_file_path}/{input_file} on {file_num}/{len(files)}. Index was out of data range.\n"
+ f"Copy Index: {hex(copy_index)}. Displacement: {displacement}. Buffer Length: {hex(len(output_buffer))}. "
+ f"Distance Data: {distance_data}. File Start: {hex(starting_index)}. File End: {hex(file_end)}. "
+ f"\nLoops left: {num_flags}"
+ f"\nRemaining Data: {data[index-1:index+15]}"
)
num_flags = num_flags - 1
except Exception as e:
print(
f"\nError while processing {input_file_path}/{input_file[:-4]}_{hex(file_num)[2:].zfill(3)}. \nError: {e}"
)
starting_index = starting_index + int(file.get("padding_end")[2:], 16)
# This handoff is so I can change buffer logic without breaking write-out logic.
out_data = output_buffer
if output_file:
try:
with open(
f"{output_folder_path}/{input_file}_{str(file_num).zfill(3)}.file{file['type'][2:]}",
"wb",
) as outfile:
outfile.write(out_data)
except IOError as e:
raise IOError(
f"Unable to write file for {input_file_path}/{input_file} on {file_num}/{len(files)}. Error: {e}"
)
else:
return_files.append(
{
type: file["type"],
data: out_data,
}
)
if output_file:
index = starting_index
skip_overflow = True
if len(data) > index:
for item in data[index:]:
overflow_buffer.append(item)
for item in overflow_buffer:
if item != 0x00:
skip_overflow = False
if not skip_overflow:
with open(
f"{output_folder_path}/{input_file}.overflow.file", "wb"
) as outfile:
outfile.write(overflow_buffer)
print(
f"File {output_folder_path}/{input_file}.overflow.file saved successfully!"
)
else:
return return_files
if __name__ == "__main__":
compressor = BZZCompressor()
for dirpath, dirnames, filenames in os.walk("./bin_extract"):
for file in filenames:
if ".bzz" in file[4:]:
output_folder_path = Path(f"out/{'/'.join(dirpath.split("/")[2:])}")
output_folder_path.mkdir(parents=True, exist_ok=True)
try:
compressor.decompress(dirpath, file, True, output_folder_path)
except Exception as e:
print(
f"\nError while decompressing {output_folder_path}/{file}. \nError: {e}"
)
continue