Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / accelerators / neuron.py
Size: Mime:
import json
import logging
import os
import subprocess
import sys
from typing import List, Optional, Tuple

from ray._private.accelerators.accelerator import AcceleratorManager

logger = logging.getLogger(__name__)

NEURON_RT_VISIBLE_CORES_ENV_VAR = "NEURON_RT_VISIBLE_CORES"
NOSET_AWS_NEURON_RT_VISIBLE_CORES_ENV_VAR = (
    "RAY_EXPERIMENTAL_NOSET_NEURON_RT_VISIBLE_CORES"
)

# https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/inf2-arch.html#aws-inf2-arch
# https://awsdocs-neuron.readthedocs-hosted.com/en/latest/general/arch/neuron-hardware/trn1-arch.html#aws-trn1-arch
# Subject to removal after the information is available via public API
AWS_NEURON_INSTANCE_MAP = {
    "trn1.2xlarge": 2,
    "trn1.32xlarge": 32,
    "trn1n.32xlarge": 32,
    "inf2.xlarge": 2,
    "inf2.8xlarge": 2,
    "inf2.24xlarge": 12,
    "inf2.48xlarge": 24,
}


class NeuronAcceleratorManager(AcceleratorManager):
    """AWS Inferentia and Trainium accelerators."""

    @staticmethod
    def get_resource_name() -> str:
        return "neuron_cores"

    @staticmethod
    def get_visible_accelerator_ids_env_var() -> str:
        return NEURON_RT_VISIBLE_CORES_ENV_VAR

    @staticmethod
    def get_current_process_visible_accelerator_ids() -> Optional[List[str]]:
        neuron_visible_cores = os.environ.get(
            NeuronAcceleratorManager.get_visible_accelerator_ids_env_var(), None
        )

        if neuron_visible_cores is None:
            return None

        if neuron_visible_cores == "":
            return []

        return list(neuron_visible_cores.split(","))

    @staticmethod
    def get_current_node_num_accelerators() -> int:
        """
        Attempt to detect the number of Neuron cores on this machine.

        Returns:
            The number of Neuron cores if any were detected, otherwise 0.
        """
        nc_count: int = 0
        neuron_path = "/opt/aws/neuron/bin/"
        if sys.platform.startswith("linux") and os.path.isdir(neuron_path):
            result = subprocess.run(
                [os.path.join(neuron_path, "neuron-ls"), "--json-output"],
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
            )
            if result.returncode == 0 and result.stdout:
                neuron_devices = json.loads(result.stdout)
                for neuron_device in neuron_devices:
                    nc_count += neuron_device.get("nc_count", 0)
        return nc_count

    @staticmethod
    def get_current_node_accelerator_type() -> Optional[str]:
        from ray.util.accelerators import AWS_NEURON_CORE

        return AWS_NEURON_CORE

    @staticmethod
    def validate_resource_request_quantity(
        quantity: float,
    ) -> Tuple[bool, Optional[str]]:
        if isinstance(quantity, float) and not quantity.is_integer():
            return (
                False,
                f"{NeuronAcceleratorManager.get_resource_name()} resource quantity"
                " must be whole numbers. "
                f"The specified quantity {quantity} is invalid.",
            )
        else:
            return (True, None)

    @staticmethod
    def set_current_process_visible_accelerator_ids(
        visible_neuron_core_ids: List[str],
    ) -> None:
        """Set the NEURON_RT_VISIBLE_CORES environment variable based on
        given visible_neuron_core_ids.

        Args:
            visible_neuron_core_ids (List[str]): List of int representing core IDs.
        """
        if os.environ.get(NOSET_AWS_NEURON_RT_VISIBLE_CORES_ENV_VAR):
            return

        os.environ[
            NeuronAcceleratorManager.get_visible_accelerator_ids_env_var()
        ] = ",".join([str(i) for i in visible_neuron_core_ids])

    @staticmethod
    def get_ec2_instance_num_accelerators(
        instance_type: str, instances: dict
    ) -> Optional[int]:
        # TODO: AWS SDK (public API) doesn't yet expose the NeuronCore
        # information. It will be available (work-in-progress)
        # as xxAcceleratorInfo in InstanceTypeInfo.
        # https://docs.aws.amazon.com/AWSEC2/latest/APIReference/API_InstanceTypeInfo.html
        # See https://github.com/ray-project/ray/issues/38473
        return AWS_NEURON_INSTANCE_MAP.get(instance_type.lower(), None)

    @staticmethod
    def get_ec2_instance_accelerator_type(
        instance_type: str, instances: dict
    ) -> Optional[str]:
        from ray.util.accelerators import AWS_NEURON_CORE

        return AWS_NEURON_CORE