Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
fsspec / implementations / zip.py
Size: Mime:
from __future__ import absolute_import, division, print_function

import datetime
import zipfile

from fsspec import open_files
from fsspec.archive import AbstractArchiveFileSystem
from fsspec.utils import DEFAULT_BLOCK_SIZE


class ZipFileSystem(AbstractArchiveFileSystem):
    """Read contents of ZIP archive as a file-system

    Keeps file object open while instance lives.

    This class is pickleable, but not necessarily thread-safe
    """

    root_marker = ""
    protocol = "zip"
    cachable = False

    def __init__(
        self,
        fo="",
        mode="r",
        target_protocol=None,
        target_options=None,
        block_size=DEFAULT_BLOCK_SIZE,
        **kwargs,
    ):
        """
        Parameters
        ----------
        fo: str or file-like
            Contains ZIP, and must exist. If a str, will fetch file using
            `open_files()`, which must return one file exactly.
        mode: str
            Currently, only 'r' accepted
        target_protocol: str (optional)
            If ``fo`` is a string, this value can be used to override the
            FS protocol inferred from a URL
        target_options: dict (optional)
            Kwargs passed when instantiating the target FS, if ``fo`` is
            a string.
        """
        super().__init__(self, **kwargs)
        if isinstance(fo, str):
            files = open_files(
                fo, mode=mode + "b", protocol=target_protocol, **(target_options or {})
            )
            if len(files) != 1:
                raise ValueError(
                    'Path "{}" did not resolve to exactly'
                    'one file: "{}"'.format(fo, files)
                )
            fo = files[0]
        self.of = fo
        self.fo = fo.__enter__()  # the whole instance is a context
        self.zip = zipfile.ZipFile(self.fo, mode=mode)
        self.block_size = block_size
        self.dir_cache = None

    @classmethod
    def _strip_protocol(cls, path):
        # zip file paths are always relative to the archive root
        return super()._strip_protocol(path).lstrip("/")

    def __del__(self):
        self.zip.close()

    def _get_dirs(self):
        if self.dir_cache is None:
            files = self.zip.infolist()
            self.dir_cache = {
                dirname + "/": {"name": dirname + "/", "size": 0, "type": "directory"}
                for dirname in self._all_dirnames(self.zip.namelist())
            }
            for z in files:
                f = {s: getattr(z, s) for s in zipfile.ZipInfo.__slots__}
                f.update(
                    {
                        "name": z.filename,
                        "size": z.file_size,
                        "type": ("directory" if z.is_dir() else "file"),
                    }
                )
                self.dir_cache[f["name"]] = f

    def pipe_file(self, path, value, **kwargs):
        # override upstream, because we know the exact file size in this case
        info = zipfile.ZipInfo(path, datetime.datetime.now().timetuple())
        info.file_size = len(value)
        with self.zip.open(path, "w") as f:
            f.write(value)

    def _open(
        self,
        path,
        mode="rb",
        block_size=None,
        autocommit=True,
        cache_options=None,
        **kwargs,
    ):
        path = self._strip_protocol(path)
        out = self.zip.open(path, mode.strip("b"))
        if "r" in mode:
            info = self.info(path)
            out.size = info["size"]
            out.name = info["name"]
        return out