#!/usr/bin/env python3
#
# io.py
"""
File IO operations.
"""
#
# Copyright © 2025 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
# IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
# DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
# OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
# OR OTHER DEALINGS IN THE SOFTWARE.
#
# stdlib
import binascii
import inspect
import struct
import warnings
from collections.abc import Iterator
from typing import IO, Any, NamedTuple, TypeVar
# 3rd party
from domdf_python_tools.paths import PathPlus
from domdf_python_tools.typing import PathLike
# this package
from cp2077_extractor.cr2w.datatypes import Chunk, lookup_type
from cp2077_extractor.cr2w.utils import get_names_list
from cp2077_extractor.utils import decompress
# this package
from .header_structs import (
CR2WBufferInfo,
CR2WEmbeddedInfo,
CR2WExportInfo,
CR2WFile,
CR2WFileHeader,
CR2WFileInfo,
CR2WImport,
CR2WImportInfo,
CR2WMetadata,
CR2WNameInfo,
CR2WProperty,
CR2WPropertyInfo,
CR2WTable,
Struct
)
__all__ = [
"CNameError",
"ParsingData",
"parse_cr2w_buffer",
"parse_cr2w_file",
"read_buffer",
"read_c_name",
"read_chunk",
"read_file_info",
"read_struct",
"read_tables",
]
_S = TypeVar("_S", bound=Struct)
[docs]def read_tables(fp: IO, table_struct: type[_S], header: CR2WTable) -> Iterator[_S]:
"""
Read a tables of the given type in from the opened file.
:param fp:
:param table_struct:
:param header:
:returns: An iterator over instances of ``table_struct``.
"""
table_bytes = fp.read(table_struct._size * header.item_count)
crc32 = binascii.crc32(table_bytes)
assert crc32 == header.crc32, (crc32, header.crc32)
for idx in range(header.item_count):
chunk = table_bytes[0 + (idx * table_struct._size):table_struct._size + (idx * table_struct._size)]
yield table_struct(*struct.unpack(table_struct._struct_format, chunk))
[docs]class CNameError(Exception):
"""
Error raised when an invalid name is read.
"""
[docs]def read_c_name(fp: IO, names_list: list[bytes]) -> bytes:
"""
Read a name from the open file.
Reads the ordinal of the name, and looks up the name string in ``names_list``.
:param fp:
:param names_list: Ordered list of names used in the file, for lookups.
"""
string_index = struct.unpack("<H", fp.read(2))[0]
assert string_index < len(names_list)
c_name = names_list[string_index]
assert c_name
if c_name == b"None":
raise CNameError()
return c_name
[docs]def read_struct(fp: IO, struct_type: type[_S]) -> _S:
"""
Read the given struct from the open file.
:param fp:
:param struct_type:
"""
return struct_type(*struct.unpack(struct_type._struct_format, fp.read(struct_type._size)))
[docs]def read_file_info(fp: IO) -> CR2WFileInfo:
"""
Read the file header and metadata.
:param fp:
"""
magic = fp.read(4)
assert magic == b"CR2W"
# File Header
file_header = read_struct(fp, CR2WFileHeader) # type: ignore[type-var]
if file_header.version > 195 or file_header.version < 163:
raise ValueError("Unsupported Version")
# Tables [7-9] are not used in cr2w so far.
table_headers = [read_struct(fp, CR2WTable) for _ in range(10)] # type: ignore[type-var]
# Read strings - block 1 (index 0)
assert fp.tell() == table_headers[0].offset, (fp.tell(), table_headers[0].offset)
string_dict: dict[int, bytes] = {}
while fp.tell() < (table_headers[0].offset + table_headers[0].item_count):
pos = fp.tell() - table_headers[0].offset
string = b''
while True:
char = fp.read(1)
if char == b"\0":
break
string += (char)
if not string:
string = b"None"
string_dict[pos] = string
# Read the other tables
name_info: list[CR2WNameInfo] = list(read_tables(fp, CR2WNameInfo, table_headers[1])) # type: ignore[type-var]
import_info: list[CR2WImportInfo] = list(
read_tables(fp, CR2WImportInfo, table_headers[2]), # type: ignore[type-var]
)
property_info: list[CR2WPropertyInfo] = list(
read_tables(fp, CR2WPropertyInfo, table_headers[3]), # type: ignore[type-var]
)
export_info: list[CR2WExportInfo] = list(
read_tables(fp, CR2WExportInfo, table_headers[4]), # type: ignore[type-var]
)
buffer_info: list[CR2WBufferInfo] = list(
read_tables(fp, CR2WBufferInfo, table_headers[5]), # type: ignore[type-var]
)
embedded_info: list[CR2WEmbeddedInfo] = list(
read_tables(fp, CR2WEmbeddedInfo, table_headers[6]), # type: ignore[type-var]
)
_names_list: list[bytes] = []
for a_name_info in name_info:
assert a_name_info.offset in string_dict
_names_list.append(string_dict[a_name_info.offset])
_imports_list = []
for an_import_info in import_info:
assert an_import_info.offset in string_dict
ret = CR2WImport(
class_name=_names_list[an_import_info.class_name],
depot_path=b'', # TODO: = depot_path or '',
flags=an_import_info.flags,
)
_imports_list.append(ret)
return CR2WFileInfo(
file_header=file_header,
string_dict=string_dict,
name_info=name_info,
import_info=import_info,
property_info=property_info,
export_info=export_info,
buffer_info=buffer_info,
embedded_info=embedded_info,
imports=_imports_list,
)
[docs]def read_chunk(fp: IO, chunk_index: int, file_info: CR2WFileInfo) -> tuple[bytes, bytes]:
"""
Read an export chunk from the file.
:param fp:
:param chunk_index:
:param file_info:
:returns: A tuple of the raw chunk data and the chunk's datatype.
"""
names_list = get_names_list(file_info)
info = file_info.export_info[chunk_index]
red_type_name = names_list[info.class_name]
assert fp.tell() == info.data_offset
data = fp.read(info.data_size)
if (fp.tell() - info.data_offset != info.data_size):
warnings.warn("Chunk size mismatch! Could lead to problems")
fp.seek(info.data_offset + info.data_size)
return data, red_type_name
[docs]def read_buffer(fp: IO, info: CR2WBufferInfo) -> bytes:
"""
Read a buffer from the CR2W/W2RC file.
:param fp:
:param info: Metadata about the buffer
"""
assert fp.tell() == info.offset
# buffer = fp.read(info.disk_size)
buffer = fp.read(info.mem_size)
if buffer[:4] == b"KARK":
# Compressed with oodle
decompressed_size = int.from_bytes(buffer[4:8], "little")
buffer = decompress(buffer[8:], decompressed_size)
# TODO: check crc32 (figure out what the input data is)
# crc32 = binascii.crc32(buffer)
# assert crc32 == info.crc32, (crc32, info.crc32)
return buffer
[docs]class ParsingData(NamedTuple):
"""
Working data for parsing CR2W/W2RC files.
"""
#: Name lookup table for the file.
names_list: list[bytes]
#: List of tuples of the raw chunk data and the chunk's datatype
chunks: list[tuple[bytes, bytes]]
#: List of tuples of the raw buffer data and the buffer metadata
buffers: list[tuple[bytes, CR2WBufferInfo]]
[docs]def parse_cr2w_file(filename: PathLike) -> CR2WFile:
"""
Parse a CR2W/W2RC file from the given path.
:param filename:
"""
filename_p = PathPlus(filename)
with filename_p.open("rb") as fp:
return parse_cr2w_buffer(fp, filename_p)
[docs]def parse_cr2w_buffer(fp: IO, filename: PathLike | None = None) -> CR2WFile:
"""
Parse a CR2W/W2RC file from an opened file.
:param fp:
:param filename: Optionally, the path of the opened file for inclusion in metadata.
"""
info = read_file_info(fp)
assert info.string_dict, "Malformed file"
# # TODO:
hash_version = None
# # use 1st string as field 0 is always empty
# hash_version = identify_hash(info.string_dict[1], info.name_info[1].hash)
# if (hash_version == HashVersion.Unknown):
# raise ValueError("Failed to identify hash version")
properties: list[CR2WProperty] = []
for property_info in info.property_info:
# TODO: properties.append(read_property(property_info))
properties.append(CR2WProperty())
if not properties:
raise ValueError("Found unsupported PropertyInfo")
# TODO: ensure CHandle/CWeakHandle can be resolved
chunks: list[tuple[bytes, bytes]] = []
for i in range(len(info.export_info)):
chunks.append(read_chunk(fp, i, info))
buffer_data: list[tuple[bytes, CR2WBufferInfo]] = []
for buffer_info in info.buffer_info:
buffer_data.append((read_buffer(fp, buffer_info), buffer_info))
parsing_data = ParsingData(get_names_list(info), chunks, buffer_data)
root_chunk_type = chunks[0][1]
var_type = lookup_type(root_chunk_type)
assert inspect.isclass(var_type)
assert issubclass(var_type, Chunk)
root_chunk = var_type.from_chunk(chunks[0][0], parsing_data)
# TODO: read embedded files
embedded_files: list[Any] = [] # TODO: value type
# for embedded_info in info.embedded_info:
# embedded_files.Add(read_embedded(embedded_info))
# TODO: check fp.tell() against header field giving file length (if there is one)
rem = fp.read(999999)
if len(rem) != 0:
warnings.warn(f"{len(rem)} bytes remaining in file!")
if filename:
meta_filename = PathPlus(filename).abspath().as_posix()
else:
meta_filename = None
metadata = CR2WMetadata(
file_name=meta_filename,
version=info.file_header.version,
build_version=info.file_header.build_version,
objects_end=info.file_header.objects_end,
hash_version=hash_version,
)
return CR2WFile(
info=info,
metadata=metadata,
properties=properties,
root_chunk=root_chunk,
embedded_files=embedded_files,
)