Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / air / _internal / remote_storage.py
Size: Mime:
import fnmatch
import os
import urllib.parse
from typing import List, Optional, Tuple

from ray.air._internal.filelock import TempFileLock

try:
    import fsspec

except ImportError:
    fsspec = None

try:
    import pyarrow
    import pyarrow.fs

    # TODO(krfricke): Remove this once gcsfs > 2022.3.0 is released
    # (and make sure to pin)
    class _CustomGCSHandler(pyarrow.fs.FSSpecHandler):
        """Custom FSSpecHandler that avoids a bug in gcsfs <= 2022.3.0."""

        def create_dir(self, path, recursive):
            try:
                # GCSFS doesn't expose `create_parents` argument,
                # so it is omitted here
                self.fs.mkdir(path)
            except FileExistsError:
                pass

except (ImportError, ModuleNotFoundError):
    pyarrow = None
    _CustomGCSHandler = None

from ray import logger


def _assert_pyarrow_installed():
    if pyarrow is None:
        raise RuntimeError(
            "Uploading, downloading, and deleting from cloud storage "
            "requires pyarrow to be installed. Install with: "
            "`pip install pyarrow`. Subsequent calls to cloud operations "
            "will be ignored."
        )


def fs_hint(uri: str) -> str:
    """Return a hint how to install required filesystem package"""
    if pyarrow is None:
        return "Please make sure PyArrow is installed: `pip install pyarrow`."
    if fsspec is None:
        return "Try installing fsspec: `pip install fsspec`."

    from fsspec.registry import known_implementations

    protocol = urllib.parse.urlparse(uri).scheme

    if protocol in known_implementations:
        return known_implementations[protocol]["err"]

    return "Make sure to install and register your fsspec-compatible filesystem."


def is_non_local_path_uri(uri: str) -> bool:
    """Check if target URI points to a non-local location"""
    parsed = urllib.parse.urlparse(uri)
    if parsed.scheme == "file" or not parsed.scheme:
        return False

    if bool(get_fs_and_path(uri)[0]):
        return True

    return False


# Cache fs objects
_cached_fs = {}


def get_fs_and_path(
    uri: str,
) -> Tuple[Optional["pyarrow.fs.FileSystem"], Optional[str]]:
    if not pyarrow:
        return None, None

    parsed = urllib.parse.urlparse(uri)
    path = parsed.netloc + parsed.path

    cache_key = (parsed.scheme, parsed.netloc)

    if cache_key in _cached_fs:
        fs = _cached_fs[cache_key]
        return fs, path

    try:
        fs, path = pyarrow.fs.FileSystem.from_uri(uri)
        _cached_fs[cache_key] = fs
        return fs, path
    except (pyarrow.lib.ArrowInvalid, pyarrow.lib.ArrowNotImplementedError):
        # Raised when URI not recognized
        if not fsspec:
            # Only return if fsspec is not installed
            return None, None

    # Else, try to resolve protocol via fsspec
    try:
        fsspec_fs = fsspec.filesystem(parsed.scheme)
    except ValueError:
        # Raised when protocol not known
        return None, None

    fsspec_handler = pyarrow.fs.FSSpecHandler
    if parsed.scheme in ["gs", "gcs"]:
        # GS doesn't support `create_parents` arg in `create_dir()`
        fsspec_handler = _CustomGCSHandler

    fs = pyarrow.fs.PyFileSystem(fsspec_handler(fsspec_fs))
    _cached_fs[cache_key] = fs

    return fs, path


def delete_at_uri(uri: str):
    _assert_pyarrow_installed()

    fs, bucket_path = get_fs_and_path(uri)
    if not fs:
        raise ValueError(
            f"Could not clear URI contents: "
            f"URI `{uri}` is not a valid or supported cloud target. "
            f"Hint: {fs_hint(uri)}"
        )

    try:
        fs.delete_dir(bucket_path)
    except Exception as e:
        logger.warning(f"Caught exception when clearing URI `{uri}`: {e}")


def download_from_uri(uri: str, local_path: str, filelock: bool = True):
    _assert_pyarrow_installed()

    fs, bucket_path = get_fs_and_path(uri)
    if not fs:
        raise ValueError(
            f"Could not download from URI: "
            f"URI `{uri}` is not a valid or supported cloud target. "
            f"Hint: {fs_hint(uri)}"
        )

    if filelock:
        with TempFileLock(f"{os.path.normpath(local_path)}.lock"):
            pyarrow.fs.copy_files(bucket_path, local_path, source_filesystem=fs)
    else:
        pyarrow.fs.copy_files(bucket_path, local_path, source_filesystem=fs)


def upload_to_uri(
    local_path: str, uri: str, exclude: Optional[List[str]] = None
) -> None:
    _assert_pyarrow_installed()

    fs, bucket_path = get_fs_and_path(uri)
    if not fs:
        raise ValueError(
            f"Could not upload to URI: "
            f"URI `{uri}` is not a valid or supported cloud target. "
            f"Hint: {fs_hint(uri)}"
        )

    if not exclude:
        pyarrow.fs.copy_files(local_path, bucket_path, destination_filesystem=fs)
        return

    # Else, walk and upload
    return _upload_to_uri_with_exclude(
        local_path=local_path, fs=fs, bucket_path=bucket_path, exclude=exclude
    )


def _upload_to_uri_with_exclude(
    local_path: str, fs: "pyarrow.fs", bucket_path: str, exclude: Optional[List[str]]
) -> None:
    def _should_exclude(candidate: str) -> bool:
        for excl in exclude:
            if fnmatch.fnmatch(candidate, excl):
                return True
        return False

    for root, dirs, files in os.walk(local_path):
        rel_root = os.path.relpath(root, local_path)
        for file in files:
            candidate = os.path.join(rel_root, file)

            if _should_exclude(candidate):
                continue

            full_source_path = os.path.normpath(os.path.join(local_path, candidate))
            full_target_path = os.path.normpath(os.path.join(bucket_path, candidate))

            pyarrow.fs.copy_files(
                full_source_path, full_target_path, destination_filesystem=fs
            )


def _ensure_directory(uri: str):
    """Create directory at remote URI.

    Some external filesystems require directories to already exist, or at least
    the `netloc` to be created (e.g. PyArrows ``mock://`` filesystem).

    Generally this should be done before and outside of Ray applications. This
    utility is thus primarily used in testing, e.g. of ``mock://` URIs.
    """
    fs, path = get_fs_and_path(uri)
    try:
        fs.create_dir(path)
    except Exception:
        pass