Source code for bethesda_structs.archive.bsa

# Copyright (c) 2018 Stephen Bunn <stephen@bunn.io>
# MIT License <https://choosealicense.com/licenses/mit/>

from typing import Generator
from pathlib import PureWindowsPath

import lz4.frame
from construct import (
    If,
    Array,
    Bytes,
    Struct,
    VarInt,
    Adapter,
    CString,
    Int32ul,
    Int64ul,
    Container,
    FlagsEnum,
    Compressed,
    IfThenElse,
    GreedyBytes,
    PascalString,
)

from ._common import ArchiveFile, BaseArchive


[docs]class LZ4CompressedAdapter(Adapter): """An adapter for LZ4 compressed data. Note: v105 BSA's utilize `LZ4 <https://github.com/lz4/lz4>`_ compression instead of `zlib <https://zlib.net/>`_. """ def _decode(self, obj: bytes, context: Container, path: str) -> bytes: """Decompresses the given bytes. Args: obj (bytes): The LZ4 compressed bytes context (Container): The context container path (str): The construct path Returns: bytes: The resulting decompressed bytes """ return lz4.frame.decompress(obj) def _encode(self, obj: bytes, context: Container, path: str) -> bytes: """Compresses the given bytes. Args: obj (bytes): The uncompressed bytes context (Container): The context container path (str): The construct path Returns: bytes: The resulting compressed bytes """ return lz4.frame.compress(obj)
[docs]class BSAArchive(BaseArchive): """Archive type for BSA files. BSA stands for "Bethesda ? Archive". These archives are compressed meshes, textures and other static resources that can be loaded as a single file instead of a directory of files (loose files). There are currently 4 versions of BSA: - ???: Morrowind - 103: Oblivion - 104: Fallout 3, Fallout: New Vegas, and Skyrim - 105: Skyrim: Special Edition Note: BSA archives to not read the file data on initialization. Header's, records and names are read in and files are built during :func:`~BSAArchive.iter_files`. **Credit:** - `BAE <https://github.com/jonwd7/bae>`_ """ SIZE_MASK = 0x3fffffff COMPRESSED_MASK = 0xc0000000 header_struct = Struct( "magic" / Bytes(4), "version" / Int32ul, "directory_offset" / Int32ul, "archive_flags" / FlagsEnum( Int32ul, directories_named=0x001, files_named=0x002, files_compressed=0x004, _unknown_0=0x008, _unknown_1=0x010, _unknown_2=0x020, xbox360_archive=0x040, files_prefixed=0x100, _unknown_4=0x200, _unknown_5=0x400, ), "directory_count" / Int32ul, "file_count" / Int32ul, "directory_names_length" / Int32ul, "file_names_length" / Int32ul, "file_flags" / FlagsEnum( Int32ul, nif=0x001, dds=0x002, xml=0x004, wav=0x008, mp3=0x010, txt=0x020, html=0x020, bat=0x020, scc=0x020, spt=0x040, tex=0x080, fnt=0x080, ctl=0x100, ), ) """The structure of BSA headers. Returns: :class:`~construct.core.Struct`: The structure of BSA headers """ directory_record_struct = Struct( "hash" / Int64ul, "file_count" / Int32ul, "_unknown_0" / If(lambda this: this._.header.version >= 105, Int32ul), "name_offset" / IfThenElse(lambda this: this._.header.version >= 105, Int64ul, Int32ul), ) """The structure of directory records. Returns: :class:`~construct.core.Struct`: The structure of directory records """ file_record_struct = Struct("hash" / Int64ul, "size" / Int32ul, "offset" / Int32ul) """The structure of file records. Returns: :class:`~construct.core.Struct`: The structure of file records """ directory_block_struct = Struct( "name" / If( lambda this: this._.header.archive_flags.directories_named, PascalString(VarInt, "utf8"), ), "file_records" / Array( lambda this: this._.directory_records[this._._index].file_count, file_record_struct, ), ) """The structure of directory blocks. Returns: :class:`~construct.core.Struct`: The structure of directory blocks """ archive_struct = Struct( "header" / header_struct, "directory_records" / Array(lambda this: this.header.directory_count, directory_record_struct), "directory_blocks" / Array(lambda this: this.header.directory_count, directory_block_struct), "file_names" / If( lambda this: this.header.archive_flags.files_named, Array(lambda this: this.header.file_count, CString("utf8")), ), ) """The **partial** structure of BSA archives. Return: :class:`~construct.core.Struct`: The **partial** structure of BSA archives """ @property def uncompressed_file_struct(self) -> Struct: """The uncompressed file structure for uncompressed files. Returns: :class:`~construct.core.Struct`: The uncompressed file structure for uncompressed files. """ return Struct("data" / GreedyBytes) @property def compressed_file_struct(self) -> Struct: """The compressed file structure for compressed files. Returns: :class:`~construct.core.Struct`: The compressed file structure for compressed files. """ return Struct( "original_size" / Int32ul, "data" / IfThenElse( self.container.header.version >= 105, LZ4CompressedAdapter(GreedyBytes), Compressed(GreedyBytes, "zlib"), ), )
[docs] @classmethod def can_handle(cls, filepath: str) -> bool: """Determines if a given file can be handled by the current archive. Args: filepath (str): The filepath to check if can be handled """ header = cls.header_struct.parse_file(filepath) return header.magic == b"BSA\x00" and header.version in (103, 104, 105)
[docs] def iter_files(self) -> Generator[ArchiveFile, None, None]: """Iterates over the parsed data and yields instances of :class:`.ArchiveFile`. Yields: :class:`.ArchiveFile`: An file contained within the archive """ file_index = 0 file_struct = self.uncompressed_file_struct if self.container.header.archive_flags.files_compressed: file_struct = self.compressed_file_struct for directory_block in self.container.directory_blocks: # get directory path from directory block directory_path = PureWindowsPath(directory_block.name[:-1]) for file_record in directory_block.file_records: # choose the compressed file structure if compressed mask is set if file_record.size > 0 and ( self.container.header.archive_flags.files_compressed != bool(file_record.size & self.COMPRESSED_MASK) ): file_struct = self.compressed_file_struct file_container = file_struct.parse( self.content[ file_record.offset : ( file_record.offset + (file_record.size & self.SIZE_MASK) ) ] ) yield ArchiveFile( filepath=directory_path.joinpath( self.container.file_names[file_index] ), data=file_container.data, ) file_index += 1