Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / _private / runtime_env / conda_utils.py
Size: Mime:
import logging
import os
import shutil
import subprocess
import hashlib
import json
from typing import Optional, List, Union, Tuple

"""Utilities for conda.  Adapted from https://github.com/mlflow/mlflow."""

# Name of environment variable indicating a path to a conda installation. Ray
# will default to running "conda" if unset.
RAY_CONDA_HOME = "RAY_CONDA_HOME"

_WIN32 = os.name == "nt"


def get_conda_activate_commands(conda_env_name: str) -> List[str]:
    """
    Get a list of commands to run to silently activate the given conda env.
    """
    #  Checking for newer conda versions
    if not _WIN32 and ("CONDA_EXE" in os.environ or RAY_CONDA_HOME in os.environ):
        conda_path = get_conda_bin_executable("conda")
        activate_conda_env = [
            ". {}/../etc/profile.d/conda.sh".format(os.path.dirname(conda_path))
        ]
        activate_conda_env += ["conda activate {} 1>&2".format(conda_env_name)]

    else:
        activate_path = get_conda_bin_executable("activate")
        if not _WIN32:
            # Use bash command syntax
            return ["source %s %s 1>&2" % (activate_path, conda_env_name)]
        else:
            return ["conda activate %s" % (conda_env_name)]
    return activate_conda_env


def get_conda_bin_executable(executable_name: str) -> str:
    """
    Return path to the specified executable, assumed to be discoverable within
    a conda installation.

    The conda home directory (expected to contain a 'bin' subdirectory on
    linux) is configurable via the ``RAY_CONDA_HOME`` environment variable. If
    ``RAY_CONDA_HOME`` is unspecified, try the ``CONDA_EXE`` environment
    variable set by activating conda. If neither is specified, this method
    returns `executable_name`.
    """
    conda_home = os.environ.get(RAY_CONDA_HOME)
    if conda_home:
        if _WIN32:
            candidate = os.path.join(conda_home, "%s.exe" % executable_name)
            if os.path.exists(candidate):
                return candidate
            candidate = os.path.join(conda_home, "%s.bat" % executable_name)
            if os.path.exists(candidate):
                return candidate
        else:
            return os.path.join(conda_home, "bin/%s" % executable_name)
    else:
        conda_home = "."
    # Use CONDA_EXE as per https://github.com/conda/conda/issues/7126
    if "CONDA_EXE" in os.environ:
        conda_bin_dir = os.path.dirname(os.environ["CONDA_EXE"])
        if _WIN32:
            candidate = os.path.join(conda_home, "%s.exe" % executable_name)
            if os.path.exists(candidate):
                return candidate
            candidate = os.path.join(conda_home, "%s.bat" % executable_name)
            if os.path.exists(candidate):
                return candidate
        else:
            return os.path.join(conda_bin_dir, executable_name)
    if _WIN32:
        return executable_name + ".bat"
    return executable_name


def _get_conda_env_name(conda_env_path: str) -> str:
    conda_env_contents = open(conda_env_path).read()
    return "ray-%s" % hashlib.sha1(conda_env_contents.encode("utf-8")).hexdigest()


def create_conda_env_if_needed(
    conda_yaml_file: str, prefix: str, logger: Optional[logging.Logger] = None
) -> None:
    """
    Given a conda YAML, creates a conda environment containing the required
    dependencies if such a conda environment doesn't already exist.
    Args:
        conda_yaml_file: The path to a conda `environment.yml` file.
        prefix: Directory to install the environment into via
            the `--prefix` option to conda create.  This also becomes the name
            of the conda env; i.e. it can be passed into `conda activate` and
            `conda remove`
    """
    if logger is None:
        logger = logging.getLogger(__name__)
    conda_path = get_conda_bin_executable("conda")
    try:
        exec_cmd([conda_path, "--help"], throw_on_error=False)
    except (EnvironmentError, FileNotFoundError):
        raise ValueError(
            f"Could not find Conda executable at '{conda_path}'. "
            "Ensure Conda is installed as per the instructions at "
            "https://conda.io/projects/conda/en/latest/"
            "user-guide/install/index.html. "
            "You can also configure Ray to look for a specific "
            f"Conda executable by setting the {RAY_CONDA_HOME} "
            "environment variable to the path of the Conda executable."
        )

    _, stdout, _ = exec_cmd([conda_path, "env", "list", "--json"])
    envs = json.loads(stdout)["envs"]

    if prefix in envs:
        logger.info(f"Conda environment {prefix} already exists.")
        return

    create_cmd = [
        conda_path,
        "env",
        "create",
        "--file",
        conda_yaml_file,
        "--prefix",
        prefix,
    ]

    logger.info(f"Creating conda environment {prefix}")
    exit_code, output = exec_cmd_stream_to_logger(create_cmd, logger)
    if exit_code != 0:
        if os.path.exists(prefix):
            shutil.rmtree(prefix)
        raise RuntimeError(
            f"Failed to install conda environment {prefix}:\nOutput:\n{output}"
        )


def delete_conda_env(prefix: str, logger: Optional[logging.Logger] = None) -> bool:
    if logger is None:
        logger = logging.getLogger(__name__)

    logger.info(f"Deleting conda environment {prefix}")

    conda_path = get_conda_bin_executable("conda")
    delete_cmd = [conda_path, "remove", "-p", prefix, "--all", "-y"]
    exit_code, output = exec_cmd_stream_to_logger(delete_cmd, logger)

    if exit_code != 0:
        logger.debug(f"Failed to delete conda environment {prefix}:\n{output}")
        return False

    return True


def get_conda_env_list() -> list:
    """
    Get conda env list.
    """
    conda_path = get_conda_bin_executable("conda")
    try:
        exec_cmd([conda_path, "--help"], throw_on_error=False)
    except EnvironmentError:
        raise ValueError(f"Could not find Conda executable at {conda_path}.")
    _, stdout, _ = exec_cmd([conda_path, "env", "list", "--json"])
    envs = json.loads(stdout)["envs"]
    return envs


class ShellCommandException(Exception):
    pass


def exec_cmd(
    cmd: List[str], throw_on_error: bool = True, logger: Optional[logging.Logger] = None
) -> Union[int, Tuple[int, str, str]]:
    """
    Runs a command as a child process.

    A convenience wrapper for running a command from a Python script.

    Note on the return value: A tuple of the exit code,
    standard output and standard error is returned.

    Args:
        cmd: the command to run, as a list of strings
        throw_on_error: if true, raises an Exception if the exit code of the
            program is nonzero
    """
    child = subprocess.Popen(
        cmd,
        stdout=subprocess.PIPE,
        stdin=subprocess.PIPE,
        stderr=subprocess.PIPE,
        universal_newlines=True,
    )
    (stdout, stderr) = child.communicate()
    exit_code = child.wait()
    if throw_on_error and exit_code != 0:
        raise ShellCommandException(
            "Non-zero exit code: %s\n\nSTDOUT:\n%s\n\nSTDERR:%s"
            % (exit_code, stdout, stderr)
        )
    return exit_code, stdout, stderr


def exec_cmd_stream_to_logger(
    cmd: List[str], logger: logging.Logger, n_lines: int = 50, **kwargs
) -> Tuple[int, str]:
    """Runs a command as a child process, streaming output to the logger.

    The last n_lines lines of output are also returned (stdout and stderr).
    """
    if "env" in kwargs and _WIN32 and "PATH" not in [x.upper() for x in kwargs.keys]:
        raise ValueError("On windows, Popen requires 'PATH' in 'env'")
    child = subprocess.Popen(
        cmd,
        universal_newlines=True,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        **kwargs,
    )
    last_n_lines = []
    with child.stdout:
        for line in iter(child.stdout.readline, b""):
            exit_code = child.poll()
            if exit_code is not None:
                break
            line = line.strip()
            if not line:
                continue
            last_n_lines.append(line.strip())
            last_n_lines = last_n_lines[-n_lines:]
            logger.info(line.strip())

    exit_code = child.wait()
    return exit_code, "\n".join(last_n_lines)