Source code for bethesda_structs.archive._common
# Copyright (c) 2018 Stephen Bunn <stephen@bunn.io>
# MIT License <https://choosealicense.com/licenses/mit/>
import os
import abc
from typing import Generic, TypeVar, Callable, Generator
from pathlib import Path
import attr
from construct import Construct, Container, StreamError
from .._common import BaseFiletype
T_BaseArchive = TypeVar("BaseArchive")
[docs]@attr.s
class ArchiveFile(object):
"""A generic archive file object that can be used for extracting.
The purpose of this object is to provide some generic format for
:func:`~BaseArchive.iter_files` to yield so that the :func:`~BaseArchive.extract`
method can be abstracted away from the archive subclasses.
"""
filepath = attr.ib(type=str, converter=Path)
"""The relative filepath of the archived file.
Returns:
str: The relative filepath of the archived file
"""
data = attr.ib(type=bytes, repr=False)
"""The raw data of the archived file.
Returns:
bytes: The raw data of the archived file
"""
@property
def size(self) -> int:
"""The size of the raw data.
Returns:
int: The size of the raw data
"""
return len(self.data)
[docs]@attr.s
class BaseArchive(BaseFiletype, abc.ABC, Generic[T_BaseArchive]):
"""The base class all Archives should subclass.
"""
content = attr.ib(type=bytes, repr=False)
filepath = attr.ib(type=str, default=None)
container = attr.ib(type=Container, default=None, repr=False, init=False)
def __attrs_post_init__(self):
"""Initializes the non-init attributes.
"""
if self.filepath:
self.filepath = Path(self.filepath)
try:
self.container = self.archive_struct.parse(self.content)
except StreamError as exc:
raise ValueError(
(
f"parsing {self.__class__.__name__} archive struct is not large "
f"enough, {exc}"
)
)
@abc.abstractproperty
def archive_struct(self) -> Construct:
"""The base archive structure to use for parsing the **full** archive.
Raises:
NotImplementedError: Subclasses must implement
Returns:
:class:`construct.Construct`: The archive structure
"""
raise NotImplementedError
[docs] @classmethod
def parse(cls, content: bytes, filepath: str = None) -> T_BaseArchive:
"""Create a :class:`BaseArchive` from a byte array.
Args:
content (bytes): The byte content of the archive
filepath (str, optional): Defaults to None.
Sets the filepath attribute for user's reference
Raises:
ValueError: If the given content is not of bytes
Returns:
:class:`BaseArchive`: An archive instance
"""
if not isinstance(content, bytes):
raise ValueError(
f"given content must be of bytes, recieved {type(content)!r}"
)
return cls(content, filepath=filepath)
[docs] @abc.abstractmethod
def iter_files(self) -> Generator[ArchiveFile, None, None]:
"""Iterates over the available files in the archive.
Yields:
ArchiveFile: An archive file
Raises:
NotImplementedError: Subclasses must implement
"""
raise NotImplementedError
[docs] def extract(
self, to_dir: str, progress_hook: Callable[[int, int, str], None] = None
):
"""Extracts the content of the `BaseArchive` to the given directory.
Args:
to_dir (str): The directory to extract the content to
progress_hook (Callable[[int, int, str], None], optional): Defaults to None.
A progress hook that should expect (``current``, ``total``,
``current_filepath``) as arguments
Example:
>>> FILEPATH = "" # absolute path to BSA/BTDX archive
>>> archive = bethesda_structs.archive.get_archive(FILEPATH)
>>> archive.extract('/home/username/Downloads/extracted')
Example:
>>> def progress_hook(current, total, filepath):
... print((current / total) * 100.0)
>>> FILEPATH = "" # absolute path to BSA/BTDX archive
>>> archive = bethesda_structs.archive.get_archive(FILEPATH)
>>> archive.extract(
... '/home/username/Downloads/extracted',
... progress_hook=progress_hook
... )
0.0
12.2
12.2
12.67
12.67
50.4443
50.4443
70.0
70.0
92.1
92.1
100.0
Note:
The provided progress hook is simple and two-stage. It is called once
before a file is being written and once after the same file is done
being written.
"""
if not os.path.isdir(to_dir):
raise NotADirectoryError(f"no directory {to_dir!r} exists")
to_dir = Path(to_dir)
archive_files = list(self.iter_files())
total_size = sum(entry.size for entry in archive_files)
current_size = 0
for entry in archive_files:
to_path = to_dir.joinpath(entry.filepath)
if callable(progress_hook):
progress_hook(current_size, total_size, to_path.as_posix())
if not to_path.parent.is_dir():
to_path.parent.mkdir(parents=True)
with to_path.open("wb") as stream:
stream.write(entry.data)
current_size += entry.size
if callable(progress_hook):
progress_hook(current_size, total_size, to_path.as_posix())