Source code for cp2077_extractor.utils

#!/usr/bin/env python3
#
#  utils.py
"""
General utility functions.
"""
#
#  Copyright © 2025 Dominic Davis-Foster <dominic@davis-foster.co.uk>
#
#  Permission is hereby granted, free of charge, to any person obtaining a copy
#  of this software and associated documentation files (the "Software"), to deal
#  in the Software without restriction, including without limitation the rights
#  to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
#  copies of the Software, and to permit persons to whom the Software is
#  furnished to do so, subject to the following conditions:
#
#  The above copyright notice and this permission notice shall be included in all
#  copies or substantial portions of the Software.
#
#  THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
#  EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
#  MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.
#  IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
#  DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR
#  OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE
#  OR OTHER DEALINGS IN THE SOFTWARE.
#

# stdlib
import random
from collections import deque
from io import BytesIO
from typing import Deque, Generic, TypeVar

# 3rd party
import lameenc  # type: ignore[import-not-found]
import regex as re  # type: ignore[import-untyped]
from domdf_python_tools.paths import PathPlus
from miniaudio import SoundFileInfo, vorbis_get_info, vorbis_read  # type: ignore[import-untyped]
from mutagen.id3 import ID3, TLEN
from wem2ogg import wem_to_ogg

__all__ = ["InfiniteList", "StringReader", "to_snake_case", "transcode_file"]

try:
	# 3rd party
	from kraken_decompressor import decompress
except ImportError:

	def decompress(src: bytes, dst_len: int) -> bytes:
		msg = "Kraken decompression unavailable ('kraken-decompressor' not installed or unsupported platform)"
		raise NotImplementedError(msg)


[docs]def transcode_file( wem_filename: PathPlus, mp3_filename: PathPlus, length_range: tuple[int, int] | None = None, ) -> None: """ Transcode a WWise ``.wem`` file to mp3 at 256kbps. Requires ``ffmpeg`` to be installed. :param wem_filename: :param mp3_filename: :param length_range: Files with durations in seconds outside this range will be skipped. """ # TODO: see how vgmstream gets length; probably in file header # print(wem_filename, "->", mp3_filename) ogg_data = wem_to_ogg(wem_filename.read_bytes()) ogg_info: SoundFileInfo = vorbis_get_info(ogg_data) # print("nchannels =", ogg_info.nchannels) # print("sample_rate =", ogg_info.sample_rate) # print("sample_width =", ogg_info.sample_width) # print("num_frames =", ogg_info.num_frames) # print("duration =", ogg_info.duration) # Seconds # print("sub_format =", ogg_info.sub_format) length = ogg_info.duration if not length_range or (length_range[1] >= length >= length_range[0]): pcm_data = bytes(vorbis_read(data=ogg_data).samples) encoder = lameenc.Encoder() encoder.set_bit_rate(256) # encoder.set_in_sample_rate(sample_rate) # encoder.set_channels(2) encoder.set_in_sample_rate(ogg_info.sample_rate) encoder.set_channels(ogg_info.nchannels) encoder.set_quality(2) # 2-highest, 7-fastest mp3_data = encoder.encode(pcm_data) mp3_data += encoder.flush() # Flush when finished encoding the entire stream tags = ID3() tags.add(TLEN(encoding=0, data=length * 1000)) data = tags._prepare_data(BytesIO(mp3_data), 0, 0, 4, '/', None) mp3_filename.write_bytes(data + mp3_data)
# else: # print("Skip ogg; too short or too long") _T = TypeVar("_T")
[docs]class InfiniteList(Generic[_T]): """ List-like object that refills with a random order once empty. :param items: Values to loop through. """ _items: list[_T] _recent: Deque[_T] _working_items: list[_T] def __init__(self, items: list[_T]) -> None: self._items = items[:] if items: # self._recent = deque(maxlen=min(len(items) - 1, 5)) self._recent = deque(maxlen=min(len(items) - 1, 5)) else: self._recent = deque() self.repopulate()
[docs] def repopulate(self) -> None: """ Repopulate the list with a new random order, avoiding recent items occuring soon. """ # print("Starting repopulate") self._working_items = [] remaining_items = self._items[:] while remaining_items: # item = random.choice(remaining_items) # if len(self._working_items) < self._recent.maxlen: # if item in self._recent: # continue choices = [] for item in remaining_items: assert self._recent.maxlen is not None if len(self._working_items) < self._recent.maxlen: if item in self._recent: continue choices.append(item) if not choices: choices = remaining_items item = random.choice(choices) self._working_items.append(item) remaining_items.remove(item) self._working_items.reverse()
[docs] def pop(self) -> _T: """ Get the next item from the back of the list. """ if not self._working_items: self.repopulate() item = self._working_items.pop() self._recent.append(item) return item
_case_boundary_re = re.compile("(\\p{Ll})(\\p{Lu})") _single_letters_re = re.compile("(\\p{Lu}|\\p{N})(\\p{Lu})(\\p{Ll})")
[docs]def to_snake_case(value: str) -> str: """ Convert the given string into ``snake_case``. :param value: """ # Matches VSCode behaviour case_boundary = _case_boundary_re.findall(value) single_letters = _single_letters_re.findall(value) if not case_boundary and not single_letters: return value.lower() value = _case_boundary_re.sub(r"\1_\2", value) value = _case_boundary_re.sub(r"\1_\2\3", value) return value.lower()
[docs]class StringReader(BytesIO): """ Reader for REDengine sized strings. """ vlq_value_mask = 0b01111111 vlq_continuation = 0b10000000
[docs] def parse_string_and_size(self) -> tuple[int, str]: """ Parse a length-prefixed string (as bytes) to a Python string. :returns: Tuple of length prefix and the string. """ size_prefix = self.read_vlq_int32() # The string length is the absolute value of the size prefix string_length = abs(size_prefix) if not string_length: return size_prefix, '' # Sign bit indicates whether UTF-16 (0) or UTF-8 (1) if size_prefix > 0: encoding = "UTF-16" else: encoding = "UTF-8" return size_prefix, self.read(string_length).decode(encoding)
[docs] def parse_string(self) -> str: """ Parse a length-prefixed string (as bytes) to a Python string. """ return self.parse_string_and_size()[1]
[docs] def read_vlq_int32(self) -> int: """ Parse modified 32 bit VLQ to int. The first bit is the sign bit, the 2nd bit tells whether there are more octets to read, and the next 6 bytes are the least significant bits of the number data. Remaining octets are 1+7 continuation and data. """ b = self.read(1)[0] is_negative = bool(b & 0b10000000) # Take the initial value from the lower 6 bits value = b & 0b00111111 # Is the value larger than 6 bits? if (b & 0b01000000): # The first octet stores the continuation flag in the 6th bit b = self.read(1)[0] # Mask and add the next 7 bits value |= (b & self.vlq_value_mask) << 6 # Is the value larger than 13 bits? if (b & self.vlq_continuation): b = self.read(1)[0] value |= (b & self.vlq_value_mask) << 13 # Is the value larger than 20 bits? if (b & self.vlq_continuation): b = self.read(1)[0] value |= (b & self.vlq_value_mask) << 20 # Is the value larger than 27 bits? if (b & self.vlq_continuation): b = self.read(1)[0] value |= (b & self.vlq_value_mask) << 27 # Is the value larger than 34 bits? That seems bad if (b & self.vlq_continuation): raise ValueError("Continuation bit set on 5th byte") if is_negative: return -value else: return value