Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

alkaline-ml / joblib   python

Repository URL to install this package:

/ compressor.py

"""Classes and functions for managing compressors."""

import sys
import io
import zlib
from distutils.version import LooseVersion

from ._compat import _basestring, PY3_OR_LATER

try:
    from threading import RLock
except ImportError:
    from dummy_threading import RLock

try:
    import bz2
except ImportError:
    bz2 = None

try:
    import lzma
except ImportError:
    lzma = None

try:
    import lz4
    if PY3_OR_LATER:
        from lz4.frame import LZ4FrameFile
except ImportError:
    lz4 = None

LZ4_NOT_INSTALLED_ERROR = ('LZ4 is not installed. Install it with pip: '
                           'https://python-lz4.readthedocs.io/')

# Registered compressors
_COMPRESSORS = {}

# Magic numbers of supported compression file formats.
_ZFILE_PREFIX = b'ZF'  # used with pickle files created before 0.9.3.
_ZLIB_PREFIX = b'\x78'
_GZIP_PREFIX = b'\x1f\x8b'
_BZ2_PREFIX = b'BZ'
_XZ_PREFIX = b'\xfd\x37\x7a\x58\x5a'
_LZMA_PREFIX = b'\x5d\x00'
_LZ4_PREFIX = b'\x04\x22\x4D\x18'


def register_compressor(compressor_name, compressor,
                        force=False):
    """Register a new compressor.

    Parameters
    -----------
    compressor_name: str.
        The name of the compressor.
    compressor: CompressorWrapper
        An instance of a 'CompressorWrapper'.
    """
    global _COMPRESSORS
    if not isinstance(compressor_name, _basestring):
        raise ValueError("Compressor name should be a string, "
                         "'{}' given.".format(compressor_name))

    if not isinstance(compressor, CompressorWrapper):
        raise ValueError("Compressor should implement the CompressorWrapper "
                         "interface, '{}' given.".format(compressor))

    if (compressor.fileobj_factory is not None and
            (not hasattr(compressor.fileobj_factory, 'read') or
             not hasattr(compressor.fileobj_factory, 'write') or
             not hasattr(compressor.fileobj_factory, 'seek') or
             not hasattr(compressor.fileobj_factory, 'tell'))):
        raise ValueError("Compressor 'fileobj_factory' attribute should "
                         "implement the file object interface, '{}' given."
                         .format(compressor.fileobj_factory))

    if compressor_name in _COMPRESSORS and not force:
        raise ValueError("Compressor '{}' already registered."
                         .format(compressor_name))

    _COMPRESSORS[compressor_name] = compressor


class CompressorWrapper():
    """A wrapper around a compressor file object.

    Attributes
    ----------
    obj: a file-like object
        The object must implement the buffer interface and will be used
        internally to compress/decompress the data.
    prefix: bytestring
        A bytestring corresponding to the magic number that identifies the
        file format associated to the compressor.
    extention: str
        The file extension used to automatically select this compressor during
        a dump to a file.
    """

    def __init__(self, obj, prefix=b'', extension=''):
        self.fileobj_factory = obj
        self.prefix = prefix
        self.extension = extension

    def compressor_file(self, fileobj, compresslevel=None):
        """Returns an instance of a compressor file object."""
        if compresslevel is None:
            return self.fileobj_factory(fileobj, 'wb')
        else:
            return self.fileobj_factory(fileobj, 'wb',
                                        compresslevel=compresslevel)

    def decompressor_file(self, fileobj):
        """Returns an instance of a decompressor file object."""
        return self.fileobj_factory(fileobj, 'rb')


class BZ2CompressorWrapper(CompressorWrapper):

    prefix = _BZ2_PREFIX
    extension = '.bz2'

    def __init__(self):
        if bz2 is not None:
            self.fileobj_factory = bz2.BZ2File
        else:
            self.fileobj_factory = None

    def _check_versions(self):
        if bz2 is None:
            raise ValueError('bz2 module is not compiled on your python '
                             'standard library.')

    def compressor_file(self, fileobj, compresslevel=None):
        """Returns an instance of a compressor file object."""
        self._check_versions()
        if compresslevel is None:
            return self.fileobj_factory(fileobj, 'wb')
        else:
            return self.fileobj_factory(fileobj, 'wb',
                                        compresslevel=compresslevel)

    def decompressor_file(self, fileobj):
        """Returns an instance of a decompressor file object."""
        self._check_versions()
        if PY3_OR_LATER:
            fileobj = self.fileobj_factory(fileobj, 'rb')
        else:
            # In python 2, BZ2File doesn't support a fileobj opened in
            # binary mode. In this case, we pass the filename.
            fileobj = self.fileobj_factory(fileobj.name, 'rb')
        return fileobj


class LZMACompressorWrapper(CompressorWrapper):

    prefix = _LZMA_PREFIX
    extension = '.lzma'

    def __init__(self):
        if lzma is not None:
            self.fileobj_factory = lzma.LZMAFile
        else:
            self.fileobj_factory = None

    def compressor_file(self, fileobj, compresslevel=None):
        """Returns an instance of a compressor file object."""
        if compresslevel is None:
            return self.fileobj_factory(fileobj, 'wb',
                                        format=lzma.FORMAT_ALONE)
        else:
            return self.fileobj_factory(fileobj, 'wb',
                                        format=lzma.FORMAT_ALONE,
                                        preset=compresslevel)

    def decompressor_file(self, fileobj):
        """Returns an instance of a decompressor file object."""
        if PY3_OR_LATER and lzma is not None:
            # We support lzma only in python 3 because in python 2 users
            # may have installed the pyliblzma package, which also provides
            # the lzma module, but that unfortunately doesn't fully support
            # the buffer interface required by joblib.
            # See https://github.com/joblib/joblib/issues/403 for details.
            return lzma.LZMAFile(fileobj, 'rb')
        else:
            raise NotImplementedError("Lzma decompression is not "
                                      "supported for this version of "
                                      "python ({}.{})"
                                      .format(sys.version_info[0],
                                              sys.version_info[1]))


class XZCompressorWrapper(LZMACompressorWrapper):

    prefix = _XZ_PREFIX
    extension = '.xz'

    def __init__(self):
        if lzma is not None:
            self.fileobj_factory = lzma.LZMAFile
        else:
            self.fileobj_factory = None

    def compressor_file(self, fileobj, compresslevel=None):
        """Returns an instance of a compressor file object."""
        if compresslevel is None:
            return self.fileobj_factory(fileobj, 'wb', check=lzma.CHECK_NONE)
        else:
            return self.fileobj_factory(fileobj, 'wb', check=lzma.CHECK_NONE,
                                        preset=compresslevel)


class LZ4CompressorWrapper(CompressorWrapper):

    prefix = _LZ4_PREFIX
    extension = '.lz4'

    def __init__(self):
        if PY3_OR_LATER and lz4 is not None:
            self.fileobj_factory = LZ4FrameFile
        else:
            self.fileobj_factory = None

    def _check_versions(self):
        if not PY3_OR_LATER:
            raise ValueError('lz4 compression is only available with '
                             'python3+.')

        if lz4 is None:
            raise ValueError(LZ4_NOT_INSTALLED_ERROR)
        lz4_version = lz4.__version__
        if lz4_version.startswith("v"):
            lz4_version = lz4_version[1:]
        if LooseVersion(lz4_version) < LooseVersion('0.19'):
            raise ValueError(LZ4_NOT_INSTALLED_ERROR)

    def compressor_file(self, fileobj, compresslevel=None):
        """Returns an instance of a compressor file object."""
        self._check_versions()
        if compresslevel is None:
            return self.fileobj_factory(fileobj, 'wb')
        else:
            return self.fileobj_factory(fileobj, 'wb',
                                        compression_level=compresslevel)

    def decompressor_file(self, fileobj):
        """Returns an instance of a decompressor file object."""
        self._check_versions()
        return self.fileobj_factory(fileobj, 'rb')


###############################################################################
#  base file compression/decompression object definition
_MODE_CLOSED = 0
_MODE_READ = 1
_MODE_READ_EOF = 2
_MODE_WRITE = 3
_BUFFER_SIZE = 8192


class BinaryZlibFile(io.BufferedIOBase):
    """A file object providing transparent zlib (de)compression.

    A BinaryZlibFile can act as a wrapper for an existing file object, or refer
    directly to a named file on disk.

    Note that BinaryZlibFile provides only a *binary* file interface: data read
    is returned as bytes, and data to be written should be given as bytes.

    This object is an adaptation of the BZ2File object and is compatible with
    versions of python >= 2.7.

    If filename is a str or bytes object, it gives the name
    of the file to be opened. Otherwise, it should be a file object,
    which will be used to read or write the compressed data.

    mode can be 'rb' for reading (default) or 'wb' for (over)writing

    If mode is 'wb', compresslevel can be a number between 1
    and 9 specifying the level of compression: 1 produces the least
    compression, and 9 produces the most compression. 3 is the default.
    """

    wbits = zlib.MAX_WBITS

    def __init__(self, filename, mode="rb", compresslevel=3):
        # This lock must be recursive, so that BufferedIOBase's
        # readline(), readlines() and writelines() don't deadlock.
        self._lock = RLock()
        self._fp = None
        self._closefp = False
        self._mode = _MODE_CLOSED
        self._pos = 0
        self._size = -1
        self.compresslevel = compresslevel

        if not isinstance(compresslevel, int) or not (1 <= compresslevel <= 9):
            raise ValueError("'compresslevel' must be an integer "
                             "between 1 and 9. You provided 'compresslevel={}'"
                             .format(compresslevel))

        if mode == "rb":
            self._mode = _MODE_READ
            self._decompressor = zlib.decompressobj(self.wbits)
            self._buffer = b""
            self._buffer_offset = 0
        elif mode == "wb":
            self._mode = _MODE_WRITE
            self._compressor = zlib.compressobj(self.compresslevel,
                                                zlib.DEFLATED, self.wbits,
                                                zlib.DEF_MEM_LEVEL, 0)
        else:
            raise ValueError("Invalid mode: %r" % (mode,))

        if isinstance(filename, _basestring):
            self._fp = io.open(filename, mode)
            self._closefp = True
        elif hasattr(filename, "read") or hasattr(filename, "write"):
            self._fp = filename
        else:
            raise TypeError("filename must be a str or bytes object, "
                            "or a file")

    def close(self):
        """Flush and close the file.

        May be called more than once without error. Once the file is
        closed, any other operation on it will raise a ValueError.
        """
        with self._lock:
            if self._mode == _MODE_CLOSED:
                return
            try:
                if self._mode in (_MODE_READ, _MODE_READ_EOF):
                    self._decompressor = None
                elif self._mode == _MODE_WRITE:
                    self._fp.write(self._compressor.flush())
                    self._compressor = None
            finally:
                try:
                    if self._closefp:
                        self._fp.close()
                finally:
                    self._fp = None
                    self._closefp = False
Loading ...