Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
import hashlib
import json
import logging
import os
import runpy
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, List, Optional
import yaml
from filelock import FileLock
import ray
from ray._common.utils import (
get_or_create_event_loop,
try_to_create_directory,
)
from ray._private.runtime_env.conda_utils import (
create_conda_env_if_needed,
delete_conda_env,
get_conda_activate_commands,
get_conda_envs,
get_conda_info_json,
)
from ray._private.runtime_env.context import RuntimeEnvContext
from ray._private.runtime_env.packaging import Protocol, parse_uri
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray._private.runtime_env.validation import parse_and_validate_conda
from ray._private.utils import (
get_directory_size_bytes,
get_master_wheel_url,
get_release_wheel_url,
get_wheel_filename,
)
default_logger = logging.getLogger(__name__)
_WIN32 = os.name == "nt"
def _resolve_current_ray_path() -> str:
# When ray is built from source with pip install -e,
# ray.__file__ returns .../python/ray/__init__.py and this function returns
# ".../python".
# When ray is installed from a prebuilt binary, ray.__file__ returns
# .../site-packages/ray/__init__.py and this function returns
# ".../site-packages".
return os.path.split(os.path.split(ray.__file__)[0])[0]
def _get_ray_setup_spec():
"""Find the Ray setup_spec from the currently running Ray.
This function works even when Ray is built from source with pip install -e.
"""
ray_source_python_path = _resolve_current_ray_path()
setup_py_path = os.path.join(ray_source_python_path, "setup.py")
return runpy.run_path(setup_py_path)["setup_spec"]
def _resolve_install_from_source_ray_dependencies():
"""Find the Ray dependencies when Ray is installed from source."""
deps = (
_get_ray_setup_spec().install_requires + _get_ray_setup_spec().extras["default"]
)
# Remove duplicates
return list(set(deps))
def _inject_ray_to_conda_site(
conda_path, logger: Optional[logging.Logger] = default_logger
):
"""Write the current Ray site package directory to a new site"""
if _WIN32:
python_binary = os.path.join(conda_path, "python")
else:
python_binary = os.path.join(conda_path, "bin/python")
site_packages_path = (
subprocess.check_output(
[
python_binary,
"-c",
"import sysconfig; print(sysconfig.get_paths()['purelib'])",
]
)
.decode()
.strip()
)
ray_path = _resolve_current_ray_path()
logger.warning(
f"Injecting {ray_path} to environment site-packages {site_packages_path} "
"because _inject_current_ray flag is on."
)
maybe_ray_dir = os.path.join(site_packages_path, "ray")
if os.path.isdir(maybe_ray_dir):
logger.warning(f"Replacing existing ray installation with {ray_path}")
shutil.rmtree(maybe_ray_dir)
# See usage of *.pth file at
# https://docs.python.org/3/library/site.html
with open(os.path.join(site_packages_path, "ray_shared.pth"), "w") as f:
f.write(ray_path)
def _current_py_version():
return ".".join(map(str, sys.version_info[:3])) # like 3.6.10
def current_ray_pip_specifier(
logger: Optional[logging.Logger] = default_logger,
) -> Optional[str]:
"""The pip requirement specifier for the running version of Ray.
Returns:
A string which can be passed to `pip install` to install the
currently running Ray version, or None if running on a version
built from source locally (likely if you are developing Ray).
Examples:
Returns "https://s3-us-west-2.amazonaws.com/ray-wheels/[..].whl"
if running a stable release, a nightly or a specific commit
"""
if os.environ.get("RAY_CI_POST_WHEEL_TESTS"):
# Running in Buildkite CI after the wheel has been built.
# Wheels are at in the ray/.whl directory, but use relative path to
# allow for testing locally if needed.
return os.path.join(
Path(ray.__file__).resolve().parents[2], ".whl", get_wheel_filename()
)
elif ray.__commit__ == "{{RAY_COMMIT_SHA}}":
# Running on a version built from source locally.
if os.environ.get("RAY_RUNTIME_ENV_LOCAL_DEV_MODE") != "1":
logger.warning(
"Current Ray version could not be detected, most likely "
"because you have manually built Ray from source. To use "
"runtime_env in this case, set the environment variable "
"RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1."
)
return None
elif "dev" in ray.__version__:
# Running on a nightly wheel.
return get_master_wheel_url()
else:
return get_release_wheel_url()
def inject_dependencies(
conda_dict: Dict[Any, Any],
py_version: str,
pip_dependencies: Optional[List[str]] = None,
) -> Dict[Any, Any]:
"""Add Ray, Python and (optionally) extra pip dependencies to a conda dict.
Args:
conda_dict: A dict representing the JSON-serialized conda
environment YAML file. This dict will be modified and returned.
py_version: A string representing a Python version to inject
into the conda dependencies, e.g. "3.7.7"
pip_dependencies (List[str]): A list of pip dependencies that
will be prepended to the list of pip dependencies in
the conda dict. If the conda dict does not already have a "pip"
field, one will be created.
Returns:
The modified dict. (Note: the input argument conda_dict is modified
and returned.)
"""
if pip_dependencies is None:
pip_dependencies = []
if conda_dict.get("dependencies") is None:
conda_dict["dependencies"] = []
# Inject Python dependency.
deps = conda_dict["dependencies"]
# Add current python dependency. If the user has already included a
# python version dependency, conda will raise a readable error if the two
# are incompatible, e.g:
# ResolvePackageNotFound: - python[version='3.5.*,>=3.6']
deps.append(f"python={py_version}")
if "pip" not in deps:
deps.append("pip")
# Insert pip dependencies.
found_pip_dict = False
for dep in deps:
if isinstance(dep, dict) and dep.get("pip") and isinstance(dep["pip"], list):
dep["pip"] = pip_dependencies + dep["pip"]
found_pip_dict = True
break
if not found_pip_dict:
deps.append({"pip": pip_dependencies})
return conda_dict
def _get_conda_env_hash(conda_dict: Dict) -> str:
# Set `sort_keys=True` so that different orderings yield the same hash.
serialized_conda_spec = json.dumps(conda_dict, sort_keys=True)
hash = hashlib.sha1(serialized_conda_spec.encode("utf-8")).hexdigest()
return hash
def get_uri(runtime_env: Dict) -> Optional[str]:
"""Return `"conda://<hashed_dependencies>"`, or None if no GC required."""
conda = runtime_env.get("conda")
if conda is not None:
if isinstance(conda, str):
# User-preinstalled conda env. We don't garbage collect these, so
# we don't track them with URIs.
uri = None
elif isinstance(conda, dict):
uri = f"conda://{_get_conda_env_hash(conda_dict=conda)}"
else:
raise TypeError(
"conda field received by RuntimeEnvAgent must be "
f"str or dict, not {type(conda).__name__}."
)
else:
uri = None
return uri
def _get_conda_dict_with_ray_inserted(
runtime_env: "RuntimeEnv", # noqa: F821
logger: Optional[logging.Logger] = default_logger,
) -> Dict[str, Any]:
"""Returns the conda spec with the Ray and `python` dependency inserted."""
conda_dict = json.loads(runtime_env.conda_config())
assert conda_dict is not None
ray_pip = current_ray_pip_specifier(logger=logger)
if ray_pip:
extra_pip_dependencies = [ray_pip, "ray[default]"]
elif runtime_env.get_extension("_inject_current_ray"):
extra_pip_dependencies = _resolve_install_from_source_ray_dependencies()
else:
extra_pip_dependencies = []
conda_dict = inject_dependencies(
conda_dict, _current_py_version(), extra_pip_dependencies
)
return conda_dict
class CondaPlugin(RuntimeEnvPlugin):
name = "conda"
def __init__(self, resources_dir: str):
self._resources_dir = os.path.join(resources_dir, "conda")
try_to_create_directory(self._resources_dir)
# It is not safe for multiple processes to install conda envs
# concurrently, even if the envs are different, so use a global
# lock for all conda installs and deletions.
# See https://github.com/ray-project/ray/issues/17086
self._installs_and_deletions_file_lock = os.path.join(
self._resources_dir, "ray-conda-installs-and-deletions.lock"
)
# A set of named conda environments (instead of yaml or dict)
# that are validated to exist.
# NOTE: It has to be only used within the same thread, which
# is an event loop.
# Also, we don't need to GC this field because it is pretty small.
self._validated_named_conda_env = set()
def _get_path_from_hash(self, hash: str) -> str:
"""Generate a path from the hash of a conda or pip spec.
The output path also functions as the name of the conda environment
when using the `--prefix` option to `conda create` and `conda remove`.
Example output:
/tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
/conda/ray-9a7972c3a75f55e976e620484f58410c920db091
"""
return os.path.join(self._resources_dir, hash)
def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]: # noqa: F821
"""Return the conda URI from the RuntimeEnv if it exists, else return []."""
conda_uri = runtime_env.conda_uri()
if conda_uri:
return [conda_uri]
return []
def delete_uri(
self, uri: str, logger: Optional[logging.Logger] = default_logger
) -> int:
"""Delete URI and return the number of bytes deleted."""
logger.info(f"Got request to delete URI {uri}")
protocol, hash = parse_uri(uri)
if protocol != Protocol.CONDA:
raise ValueError(
"CondaPlugin can only delete URIs with protocol "
f"conda. Received protocol {protocol}, URI {uri}"
)
conda_env_path = self._get_path_from_hash(hash)
local_dir_size = get_directory_size_bytes(conda_env_path)
with FileLock(self._installs_and_deletions_file_lock):
successful = delete_conda_env(prefix=conda_env_path, logger=logger)
if not successful:
logger.warning(f"Error when deleting conda env {conda_env_path}. ")
return 0
return local_dir_size
async def create(
self,
uri: Optional[str],
runtime_env: "RuntimeEnv", # noqa: F821
context: RuntimeEnvContext,
logger: logging.Logger = default_logger,
) -> int:
if not runtime_env.has_conda():
return 0
def _create():
result = parse_and_validate_conda(runtime_env.get("conda"))
if isinstance(result, str):
# The conda env name is given.
# In this case, we only verify if the given
# conda env exists.
# If the env is already validated, do nothing.
if result in self._validated_named_conda_env:
return 0
conda_info = get_conda_info_json()
envs = get_conda_envs(conda_info)
# We accept `result` as a conda name or full path.
if not any(result == env[0] or result == env[1] for env in envs):
raise ValueError(
f"The given conda environment '{result}' "
f"from the runtime env {runtime_env} doesn't "
"exist from the output of `conda info --json`. "
"You can only specify an env that already exists. "
f"Please make sure to create an env {result} "
)
self._validated_named_conda_env.add(result)
return 0
logger.debug(
"Setting up conda for runtime_env: " f"{runtime_env.serialize()}"
)
protocol, hash = parse_uri(uri)
conda_env_name = self._get_path_from_hash(hash)
conda_dict = _get_conda_dict_with_ray_inserted(runtime_env, logger=logger)
logger.info(f"Setting up conda environment with {runtime_env}")
with FileLock(self._installs_and_deletions_file_lock):
try:
conda_yaml_file = os.path.join(
self._resources_dir, "environment.yml"
)
with open(conda_yaml_file, "w") as file:
yaml.dump(conda_dict, file)
create_conda_env_if_needed(
conda_yaml_file, prefix=conda_env_name, logger=logger
)
finally:
os.remove(conda_yaml_file)
if runtime_env.get_extension("_inject_current_ray"):
_inject_ray_to_conda_site(conda_path=conda_env_name, logger=logger)
logger.info(f"Finished creating conda environment at {conda_env_name}")
return get_directory_size_bytes(conda_env_name)
loop = get_or_create_event_loop()
return await loop.run_in_executor(None, _create)
def modify_context(
self,
uris: List[str],
runtime_env: "RuntimeEnv", # noqa: F821
context: RuntimeEnvContext,
logger: Optional[logging.Logger] = default_logger,
):
if not runtime_env.has_conda():
return
if runtime_env.conda_env_name():
conda_env_name = runtime_env.conda_env_name()
else:
protocol, hash = parse_uri(runtime_env.conda_uri())
conda_env_name = self._get_path_from_hash(hash)
context.py_executable = "python"
context.command_prefix += get_conda_activate_commands(conda_env_name)