Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ fs.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
FileSystem abstraction to interact with various local and remote filesystems.
"""

from pyarrow.util import _is_path_like, _stringify_path

from pyarrow._fs import (  # noqa
    FileSelector,
    FileType,
    FileInfo,
    FileSystem,
    LocalFileSystem,
    SubTreeFileSystem,
    _MockFileSystem,
    FileSystemHandler,
    PyFileSystem,
    _copy_files,
    _copy_files_selector,
)

# For backward compatibility.
FileStats = FileInfo

_not_imported = []
try:
    from pyarrow._azurefs import AzureFileSystem  # noqa
except ImportError:
    _not_imported.append("AzureFileSystem")

try:
    from pyarrow._hdfs import HadoopFileSystem  # noqa
except ImportError:
    _not_imported.append("HadoopFileSystem")

try:
    from pyarrow._gcsfs import GcsFileSystem  # noqa
except ImportError:
    _not_imported.append("GcsFileSystem")

try:
    from pyarrow._s3fs import (  # noqa
        AwsDefaultS3RetryStrategy, AwsStandardS3RetryStrategy,
        S3FileSystem, S3LogLevel, S3RetryStrategy, ensure_s3_initialized,
        finalize_s3, ensure_s3_finalized, initialize_s3, resolve_s3_region)
except ImportError:
    _not_imported.append("S3FileSystem")
else:
    # GH-38364: we don't initialize S3 eagerly as that could lead
    # to crashes at shutdown even when S3 isn't used.
    # Instead, S3 is initialized lazily using `ensure_s3_initialized`
    # in assorted places.
    import atexit
    atexit.register(ensure_s3_finalized)


def __getattr__(name):
    if name in _not_imported:
        raise ImportError(
            "The pyarrow installation is not built with support for "
            "'{0}'".format(name)
        )

    raise AttributeError(
        "module 'pyarrow.fs' has no attribute '{0}'".format(name)
    )


def _filesystem_from_str(uri):
    # instantiate the file system from an uri, if the uri has a path
    # component then it will be treated as a path prefix
    filesystem, prefix = FileSystem.from_uri(uri)
    prefix = filesystem.normalize_path(prefix)
    if prefix:
        # validate that the prefix is pointing to a directory
        prefix_info = filesystem.get_file_info([prefix])[0]
        if prefix_info.type != FileType.Directory:
            raise ValueError(
                "The path component of the filesystem URI must point to a "
                "directory but it has a type: `{}`. The path component "
                "is `{}` and the given filesystem URI is `{}`".format(
                    prefix_info.type.name, prefix_info.path, uri
                )
            )
        filesystem = SubTreeFileSystem(prefix, filesystem)
    return filesystem


def _ensure_filesystem(filesystem, *, use_mmap=False):
    if isinstance(filesystem, FileSystem):
        return filesystem
    elif isinstance(filesystem, str):
        if use_mmap:
            raise ValueError(
                "Specifying to use memory mapping not supported for "
                "filesystem specified as an URI string"
            )
        return _filesystem_from_str(filesystem)

    # handle fsspec-compatible filesystems
    try:
        import fsspec
    except ImportError:
        pass
    else:
        if isinstance(filesystem, fsspec.AbstractFileSystem):
            if type(filesystem).__name__ == 'LocalFileSystem':
                # In case its a simple LocalFileSystem, use native arrow one
                return LocalFileSystem(use_mmap=use_mmap)
            return PyFileSystem(FSSpecHandler(filesystem))

    raise TypeError(
        "Unrecognized filesystem: {}. `filesystem` argument must be a "
        "FileSystem instance or a valid file system URI'".format(
            type(filesystem))
    )


def _resolve_filesystem_and_path(path, filesystem=None, *, memory_map=False):
    """
    Return filesystem/path from path which could be an URI or a plain
    filesystem path.
    """
    if not _is_path_like(path):
        if filesystem is not None:
            raise ValueError(
                "'filesystem' passed but the specified path is file-like, so"
                " there is nothing to open with 'filesystem'."
            )
        return filesystem, path

    if filesystem is not None:
        filesystem = _ensure_filesystem(filesystem, use_mmap=memory_map)
        if isinstance(filesystem, LocalFileSystem):
            path = _stringify_path(path)
        elif not isinstance(path, str):
            raise TypeError(
                "Expected string path; path-like objects are only allowed "
                "with a local filesystem"
            )
        path = filesystem.normalize_path(path)
        return filesystem, path

    path = _stringify_path(path)

    # if filesystem is not given, try to automatically determine one
    # first check if the file exists as a local (relative) file path
    # if not then try to parse the path as an URI
    filesystem = LocalFileSystem(use_mmap=memory_map)

    try:
        file_info = filesystem.get_file_info(path)
    except ValueError:  # ValueError means path is likely an URI
        file_info = None
        exists_locally = False
    else:
        exists_locally = (file_info.type != FileType.NotFound)

    # if the file or directory doesn't exists locally, then assume that
    # the path is an URI describing the file system as well
    if not exists_locally:
        try:
            filesystem, path = FileSystem.from_uri(path)
        except ValueError as e:
            # neither an URI nor a locally existing path, so assume that
            # local path was given and propagate a nicer file not found error
            # instead of a more confusing scheme parsing error
            if "empty scheme" not in str(e) \
                    and "Cannot parse URI" not in str(e):
                raise
    else:
        path = filesystem.normalize_path(path)

    return filesystem, path


def copy_files(source, destination,
               source_filesystem=None, destination_filesystem=None,
               *, chunk_size=1024*1024, use_threads=True):
    """
    Copy files between FileSystems.

    This functions allows you to recursively copy directories of files from
    one file system to another, such as from S3 to your local machine.

    Parameters
    ----------
    source : string
        Source file path or URI to a single file or directory.
        If a directory, files will be copied recursively from this path.
    destination : string
        Destination file path or URI. If `source` is a file, `destination`
        is also interpreted as the destination file (not directory).
        Directories will be created as necessary.
    source_filesystem : FileSystem, optional
        Source filesystem, needs to be specified if `source` is not a URI,
        otherwise inferred.
    destination_filesystem : FileSystem, optional
        Destination filesystem, needs to be specified if `destination` is not
        a URI, otherwise inferred.
    chunk_size : int, default 1MB
        The maximum size of block to read before flushing to the
        destination file. A larger chunk_size will use more memory while
        copying but may help accommodate high latency FileSystems.
    use_threads : bool, default True
        Whether to use multiple threads to accelerate copying.

    Examples
    --------
    Inspect an S3 bucket's files:

    >>> s3, path = fs.FileSystem.from_uri(
    ...            "s3://registry.opendata.aws/roda/ndjson/")
    >>> selector = fs.FileSelector(path)
    >>> s3.get_file_info(selector)
    [<FileInfo for 'registry.opendata.aws/roda/ndjson/index.ndjson':...]

    Copy one file from S3 bucket to a local directory:

    >>> fs.copy_files("s3://registry.opendata.aws/roda/ndjson/index.ndjson",
    ...               "file:///{}/index_copy.ndjson".format(local_path))

    >>> fs.LocalFileSystem().get_file_info(str(local_path)+
    ...                                    '/index_copy.ndjson')
    <FileInfo for '.../index_copy.ndjson': type=FileType.File, size=...>

    Copy file using a FileSystem object:

    >>> fs.copy_files("registry.opendata.aws/roda/ndjson/index.ndjson",
    ...               "file:///{}/index_copy.ndjson".format(local_path),
    ...               source_filesystem=fs.S3FileSystem())
    """
    source_fs, source_path = _resolve_filesystem_and_path(
        source, source_filesystem
    )
    destination_fs, destination_path = _resolve_filesystem_and_path(
        destination, destination_filesystem
    )

    file_info = source_fs.get_file_info(source_path)
    if file_info.type == FileType.Directory:
        source_sel = FileSelector(source_path, recursive=True)
        _copy_files_selector(source_fs, source_sel,
                             destination_fs, destination_path,
                             chunk_size, use_threads)
    else:
        _copy_files(source_fs, source_path,
                    destination_fs, destination_path,
                    chunk_size, use_threads)


class FSSpecHandler(FileSystemHandler):
    """
    Handler for fsspec-based Python filesystems.

    https://filesystem-spec.readthedocs.io/en/latest/index.html

    Parameters
    ----------
    fs : FSSpec-compliant filesystem instance

    Examples
    --------
    >>> PyFileSystem(FSSpecHandler(fsspec_fs)) # doctest: +SKIP
    """

    def __init__(self, fs):
        self.fs = fs

    def __eq__(self, other):
        if isinstance(other, FSSpecHandler):
            return self.fs == other.fs
        return NotImplemented

    def __ne__(self, other):
        if isinstance(other, FSSpecHandler):
            return self.fs != other.fs
        return NotImplemented

    def get_type_name(self):
        protocol = self.fs.protocol
        if isinstance(protocol, list):
            protocol = protocol[0]
        return "fsspec+{0}".format(protocol)

    def normalize_path(self, path):
        return path

    @staticmethod
    def _create_file_info(path, info):
        size = info["size"]
        if info["type"] == "file":
            ftype = FileType.File
        elif info["type"] == "directory":
            ftype = FileType.Directory
            # some fsspec filesystems include a file size for directories
            size = None
        else:
            ftype = FileType.Unknown
        return FileInfo(path, ftype, size=size, mtime=info.get("mtime", None))

    def get_file_info(self, paths):
        infos = []
        for path in paths:
            try:
                info = self.fs.info(path)
            except FileNotFoundError:
                infos.append(FileInfo(path, FileType.NotFound))
            else:
                infos.append(self._create_file_info(path, info))
        return infos

    def get_file_info_selector(self, selector):
        if not self.fs.isdir(selector.base_dir):
            if self.fs.exists(selector.base_dir):
                raise NotADirectoryError(selector.base_dir)
            else:
                if selector.allow_not_found:
                    return []
                else:
                    raise FileNotFoundError(selector.base_dir)

        if selector.recursive:
            maxdepth = None
        else:
            maxdepth = 1

        infos = []
        selected_files = self.fs.find(
Loading ...