Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / runtime_env / uv.py
Size: Mime:
"""Util class to install packages via uv.
"""

import asyncio
import hashlib
import json
import logging
import os
import shutil
import sys
from asyncio import create_task, get_running_loop
from typing import Dict, List, Optional

from ray._common.utils import try_to_create_directory
from ray._private.runtime_env import dependency_utils, virtualenv_utils
from ray._private.runtime_env.packaging import Protocol, parse_uri
from ray._private.runtime_env.plugin import RuntimeEnvPlugin
from ray._private.runtime_env.utils import check_output_cmd
from ray._private.utils import get_directory_size_bytes

default_logger = logging.getLogger(__name__)


def _get_uv_hash(uv_dict: Dict) -> str:
    """Get a deterministic hash value for `uv` related runtime envs."""
    serialized_uv_spec = json.dumps(uv_dict, sort_keys=True)
    hash_val = hashlib.sha1(serialized_uv_spec.encode("utf-8")).hexdigest()
    return hash_val


def get_uri(runtime_env: Dict) -> Optional[str]:
    """Return `"uv://<hashed_dependencies>"`, or None if no GC required."""
    uv = runtime_env.get("uv")
    if uv is not None:
        if isinstance(uv, dict):
            uri = "uv://" + _get_uv_hash(uv_dict=uv)
        elif isinstance(uv, list):
            uri = "uv://" + _get_uv_hash(uv_dict=dict(packages=uv))
        else:
            raise TypeError(
                "uv field received by RuntimeEnvAgent must be "
                f"list or dict, not {type(uv).__name__}."
            )
    else:
        uri = None
    return uri


class UvProcessor:
    def __init__(
        self,
        target_dir: str,
        runtime_env: "RuntimeEnv",  # noqa: F821
        logger: Optional[logging.Logger] = default_logger,
    ):
        try:
            import virtualenv  # noqa: F401 ensure virtualenv exists.
        except ImportError:
            raise RuntimeError(
                f"Please install virtualenv "
                f"`{sys.executable} -m pip install virtualenv`"
                f"to enable uv runtime env."
            )

        logger.debug("Setting up uv for runtime_env: %s", runtime_env)
        self._target_dir = target_dir
        # An empty directory is created to execute cmd.
        self._exec_cwd = os.path.join(self._target_dir, "exec_cwd")
        self._runtime_env = runtime_env
        self._logger = logger

        self._uv_config = self._runtime_env.uv_config()
        self._uv_env = os.environ.copy()
        self._uv_env.update(self._runtime_env.env_vars())

    async def _install_uv(
        self, path: str, cwd: str, pip_env: dict, logger: logging.Logger
    ):
        """Before package install, make sure the required version `uv` (if specifieds)
        is installed.
        """
        virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
        python = virtualenv_utils.get_virtualenv_python(path)

        def _get_uv_exec_to_install() -> str:
            """Get `uv` executable with version to install."""
            uv_version = self._uv_config.get("uv_version", None)
            if uv_version:
                return f"uv{uv_version}"
            # Use default version.
            return "uv"

        uv_install_cmd = [
            python,
            "-m",
            "pip",
            "install",
            "--disable-pip-version-check",
            "--no-cache-dir",
            _get_uv_exec_to_install(),
        ]
        logger.info("Installing package uv to %s", virtualenv_path)
        await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)

    async def _check_uv_existence(
        self, path: str, cwd: str, env: dict, logger: logging.Logger
    ) -> bool:
        """Check and return the existence of `uv` in virtual env."""
        python = virtualenv_utils.get_virtualenv_python(path)

        check_existence_cmd = [
            python,
            "-m",
            "uv",
            "version",
        ]

        try:
            # If `uv` doesn't exist, exception will be thrown.
            await check_output_cmd(check_existence_cmd, logger=logger, cwd=cwd, env=env)
            return True
        except Exception:
            return False

    async def _uv_check(sef, python: str, cwd: str, logger: logging.Logger) -> None:
        """Check virtual env dependency compatibility.
        If any incompatibility detected, exception will be thrown.

        param:
            python: the path for python executable within virtual environment.
        """
        cmd = [python, "-m", "uv", "pip", "check"]
        await check_output_cmd(
            cmd,
            logger=logger,
            cwd=cwd,
        )

    async def _install_uv_packages(
        self,
        path: str,
        uv_packages: List[str],
        cwd: str,
        pip_env: Dict,
        logger: logging.Logger,
    ):
        """Install required python packages via `uv`."""
        virtualenv_path = virtualenv_utils.get_virtualenv_path(path)
        python = virtualenv_utils.get_virtualenv_python(path)
        # TODO(fyrestone): Support -i, --no-deps, --no-cache-dir, ...
        requirements_file = dependency_utils.get_requirements_file(path, uv_packages)

        # Check existence for `uv` and see if we could skip `uv` installation.
        uv_exists = await self._check_uv_existence(path, cwd, pip_env, logger)

        # Install uv, which acts as the default package manager.
        if (not uv_exists) or (self._uv_config.get("uv_version", None) is not None):
            await self._install_uv(path, cwd, pip_env, logger)

        # Avoid blocking the event loop.
        loop = get_running_loop()
        await loop.run_in_executor(
            None, dependency_utils.gen_requirements_txt, requirements_file, uv_packages
        )

        # Install all dependencies.
        #
        # Difference with pip:
        # 1. `--disable-pip-version-check` has no effect for uv.
        uv_install_cmd = [
            python,
            "-m",
            "uv",
            "pip",
            "install",
            "-r",
            requirements_file,
        ]

        uv_opt_list = self._uv_config.get("uv_pip_install_options", ["--no-cache"])
        if uv_opt_list:
            uv_install_cmd += uv_opt_list

        logger.info("Installing python requirements to %s", virtualenv_path)
        await check_output_cmd(uv_install_cmd, logger=logger, cwd=cwd, env=pip_env)

        # Check python environment for conflicts.
        if self._uv_config.get("uv_check", False):
            await self._uv_check(python, cwd, logger)

    async def _run(self):
        path = self._target_dir
        logger = self._logger
        uv_packages = self._uv_config["packages"]
        # We create an empty directory for exec cmd so that the cmd will
        # run more stable. e.g. if cwd has ray, then checking ray will
        # look up ray in cwd instead of site packages.
        os.makedirs(self._exec_cwd, exist_ok=True)
        try:
            await virtualenv_utils.create_or_get_virtualenv(
                path, self._exec_cwd, logger
            )
            python = virtualenv_utils.get_virtualenv_python(path)
            async with dependency_utils.check_ray(python, self._exec_cwd, logger):
                # Install packages with uv.
                await self._install_uv_packages(
                    path,
                    uv_packages,
                    self._exec_cwd,
                    self._uv_env,
                    logger,
                )
        except Exception:
            logger.info("Delete incomplete virtualenv: %s", path)
            shutil.rmtree(path, ignore_errors=True)
            logger.exception("Failed to install uv packages.")
            raise

    def __await__(self):
        return self._run().__await__()


class UvPlugin(RuntimeEnvPlugin):
    name = "uv"

    def __init__(self, resources_dir: str):
        self._uv_resource_dir = os.path.join(resources_dir, "uv")
        self._creating_task = {}
        # Maps a URI to a lock that is used to prevent multiple concurrent
        # installs of the same virtualenv, see #24513
        self._create_locks: Dict[str, asyncio.Lock] = {}
        # Key: created hashes. Value: size of the uv dir.
        self._created_hash_bytes: Dict[str, int] = {}
        try_to_create_directory(self._uv_resource_dir)

    def _get_path_from_hash(self, hash_val: str) -> str:
        """Generate a path from the hash of a uv spec.

        Example output:
            /tmp/ray/session_2021-11-03_16-33-59_356303_41018/runtime_resources
                /uv/ray-9a7972c3a75f55e976e620484f58410c920db091
        """
        return os.path.join(self._uv_resource_dir, hash_val)

    def get_uris(self, runtime_env: "RuntimeEnv") -> List[str]:  # noqa: F821
        """Return the uv URI from the RuntimeEnv if it exists, else return []."""
        uv_uri = runtime_env.uv_uri()
        if uv_uri:
            return [uv_uri]
        return []

    def delete_uri(
        self, uri: str, logger: Optional[logging.Logger] = default_logger
    ) -> int:
        """Delete URI and return the number of bytes deleted."""
        logger.info("Got request to delete uv URI %s", uri)
        protocol, hash_val = parse_uri(uri)
        if protocol != Protocol.UV:
            raise ValueError(
                "UvPlugin can only delete URIs with protocol "
                f"uv. Received protocol {protocol}, URI {uri}"
            )

        # Cancel running create task.
        task = self._creating_task.pop(hash_val, None)
        if task is not None:
            task.cancel()

        del self._created_hash_bytes[hash_val]

        uv_env_path = self._get_path_from_hash(hash_val)
        local_dir_size = get_directory_size_bytes(uv_env_path)
        del self._create_locks[uri]
        try:
            shutil.rmtree(uv_env_path)
        except OSError as e:
            logger.warning(f"Error when deleting uv env {uv_env_path}: {str(e)}")
            return 0

        return local_dir_size

    async def create(
        self,
        uri: str,
        runtime_env: "RuntimeEnv",  # noqa: F821
        context: "RuntimeEnvContext",  # noqa: F821
        logger: Optional[logging.Logger] = default_logger,
    ) -> int:
        if not runtime_env.has_uv():
            return 0

        protocol, hash_val = parse_uri(uri)
        target_dir = self._get_path_from_hash(hash_val)

        async def _create_for_hash():
            await UvProcessor(
                target_dir,
                runtime_env,
                logger,
            )

            loop = get_running_loop()
            return await loop.run_in_executor(
                None, get_directory_size_bytes, target_dir
            )

        if uri not in self._create_locks:
            # async lock to prevent the same virtualenv being concurrently installed
            self._create_locks[uri] = asyncio.Lock()

        async with self._create_locks[uri]:
            if hash_val in self._created_hash_bytes:
                return self._created_hash_bytes[hash_val]
            self._creating_task[hash_val] = task = create_task(_create_for_hash())
            task.add_done_callback(lambda _: self._creating_task.pop(hash_val, None))
            uv_dir_bytes = await task
            self._created_hash_bytes[hash_val] = uv_dir_bytes
            return uv_dir_bytes

    def modify_context(
        self,
        uris: List[str],
        runtime_env: "RuntimeEnv",  # noqa: F821
        context: "RuntimeEnvContext",  # noqa: F821
        logger: logging.Logger = default_logger,
    ):
        if not runtime_env.has_uv():
            return
        # UvPlugin only uses a single URI.
        uri = uris[0]
        # Update py_executable.
        protocol, hash_val = parse_uri(uri)
        target_dir = self._get_path_from_hash(hash_val)
        virtualenv_python = virtualenv_utils.get_virtualenv_python(target_dir)

        if not os.path.exists(virtualenv_python):
            raise ValueError(
                f"Local directory {target_dir} for URI {uri} does "
                "not exist on the cluster. Something may have gone wrong while "
                "installing the runtime_env `uv` packages."
            )
        context.py_executable = virtualenv_python
        context.command_prefix += virtualenv_utils.get_virtualenv_activate_command(
            target_dir
        )