Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / label_utils.py
Size: Mime:
import json
import re
from typing import (
    Any,
    Dict,
    List,
    Optional,
)

import yaml

import ray._private.ray_constants as ray_constants

# Regex patterns used to validate that labels conform to Kubernetes label syntax rules.
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set

# Regex for mandatory name (DNS label) or value
# Examples:
#   Valid matches: "a", "label-name", "a-._b", "123", "this_is_a_valid_label"
#   Invalid matches: "-abc", "abc-", "my@label"
LABEL_REGEX = re.compile(r"([a-zA-Z0-9]([a-zA-Z0-9_.-]{0,61}[a-zA-Z0-9])?)")

# Regex for optional prefix (DNS subdomain)
# Examples:
#   Valid matches: "abc", "sub.domain.example", "my-label", "123.456.789"
#   Invalid matches: "-abc", "prefix_", "sub..domain", sub.$$.example
LABEL_PREFIX_REGEX = rf"^({LABEL_REGEX.pattern}?(\.{LABEL_REGEX.pattern}?)*)$"

# Supported operators for label selector conditions. Not (!) conditions are handled separately.
LABEL_OPERATORS = {"in"}
# Create a pattern string dynamically based on the LABEL_OPERATORS
OPERATOR_PATTERN = "|".join([re.escape(operator) for operator in LABEL_OPERATORS])

# Regex to match valid label selector operators and values
# Examples:
#   Valid matches: "spot", "!GPU", "213521", "in(A123, B456, C789)", "!in(spot, on-demand)", "valid-value"
#   Invalid matches: "-spot", "spot_", "in()", "in(spot,", "in(H100, TPU!GPU)", "!!!in(H100, TPU)"
LABEL_SELECTOR_REGEX = re.compile(
    rf"^!?(?:{OPERATOR_PATTERN})?\({LABEL_REGEX.pattern}(?:, ?{LABEL_REGEX.pattern})*\)$|^!?{LABEL_REGEX.pattern}$"
)


def parse_node_labels_json(labels_json: str) -> Dict[str, str]:
    labels = json.loads(labels_json)
    if not isinstance(labels, dict):
        raise ValueError("The format after deserialization is not a key-value pair map")
    for key, value in labels.items():
        if not isinstance(key, str):
            raise ValueError("The key is not string type.")
        if not isinstance(value, str):
            raise ValueError(f'The value of the "{key}" is not string type')

    # Validate parsed custom node labels don't begin with ray.io prefix
    validate_node_labels(labels)

    return labels


def parse_node_labels_string(labels_str: str) -> Dict[str, str]:
    labels = {}

    # Remove surrounding quotes if they exist
    if len(labels_str) > 1 and labels_str.startswith('"') and labels_str.endswith('"'):
        labels_str = labels_str[1:-1]

    if labels_str == "":
        return labels

    # Labels argument should consist of a string of key=value pairs
    # separated by commas. Labels follow Kubernetes label syntax.
    label_pairs = labels_str.split(",")
    for pair in label_pairs:
        # Split each pair by `=`
        key_value = pair.split("=")
        if len(key_value) != 2:
            raise ValueError("Label string is not a key-value pair.")
        key = key_value[0].strip()
        value = key_value[1].strip()
        labels[key] = value

    # Validate parsed node labels follow expected Kubernetes label syntax
    validate_node_label_syntax(labels)

    return labels


def parse_node_labels_from_yaml_file(path: str) -> Dict[str, str]:
    if path == "":
        return {}
    with open(path, "r") as file:
        # Expects valid YAML content
        labels = yaml.safe_load(file)
        if not isinstance(labels, dict):
            raise ValueError(
                "The format after deserialization is not a key-value pair map."
            )
        for key, value in labels.items():
            if not isinstance(key, str):
                raise ValueError("The key is not string type.")
            if not isinstance(value, str):
                raise ValueError(f'The value of "{key}" is not string type.')

    # Validate parsed node labels follow expected Kubernetes label syntax
    validate_node_label_syntax(labels)

    return labels


# TODO (ryanaoleary@): This function will be removed after the migration to the label
# selector API from NodeLabelSchedulingPolicy is complete.
def validate_node_labels(labels: Dict[str, str]):
    if labels is None:
        return
    for key in labels.keys():
        if key.startswith(ray_constants.RAY_DEFAULT_LABEL_KEYS_PREFIX):
            raise ValueError(
                f"Custom label keys `{key}` cannot start with the prefix "
                f"`{ray_constants.RAY_DEFAULT_LABEL_KEYS_PREFIX}`. "
                f"This is reserved for Ray defined labels."
            )


def validate_label_key(key: str) -> Optional[str]:
    if "/" in key:
        prefix, name = key.rsplit("/", 1)
        if len(prefix) > 253 or not re.fullmatch(LABEL_PREFIX_REGEX, prefix):
            return str(
                f"Invalid label key prefix `{prefix}`. Prefix must be a series of DNS labels "
                f"separated by dots (.), not longer than 253 characters in total."
            )
    else:
        name = key
    if len(name) > 63 or not re.fullmatch(LABEL_REGEX, name):
        return str(
            f"Invalid label key name `{name}`. Name must be 63 chars or less beginning and ending "
            f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
            f"dots (.), and alphanumerics between."
        )
    return None


def validate_label_value(value: str):
    if value == "":
        return
    if len(value) > 63 or not re.fullmatch(LABEL_REGEX, value):
        raise ValueError(
            f"Invalid label key value `{value}`. Value must be 63 chars or less beginning and ending "
            f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
            f"dots (.), and alphanumerics between."
        )


def validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]:
    if label_selector is None:
        return None

    for key, value in label_selector.items():
        possible_error_message = validate_label_key(key)
        if possible_error_message:
            return possible_error_message
        if value is not None:
            possible_error_message = validate_label_selector_value(value)
            if possible_error_message:
                return possible_error_message

    return None


def validate_label_selector_value(selector: str) -> Optional[str]:
    if selector == "":
        return None
    if not re.fullmatch(LABEL_SELECTOR_REGEX, selector):
        return str(
            f"Invalid label selector value `{selector}`. The label selector value should contain optional operators and a label value. Supported operators are: ! and {LABEL_OPERATORS}. "
            f"Value must be 63 chars or less beginning and ending "
            f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
            f"dots (.), and alphanumerics between."
        )

    return None


# TODO (ryanaoleary@): This function will replace `validate_node_labels` after
# the migration from NodeLabelSchedulingPolicy to the Label Selector API is complete.
def validate_node_label_syntax(labels: Dict[str, str]):
    if labels is None:
        return
    for key, value in labels.items():
        possible_error_message = validate_label_key(key)
        if possible_error_message:
            raise ValueError(possible_error_message)
        if value is not None:
            validate_label_value(value)


def validate_fallback_strategy(
    fallback_strategy: Optional[List[Dict[str, Any]]]
) -> Optional[str]:
    if fallback_strategy is None:
        return None

    # Supported options for `fallback_strategy` scheduling.
    supported_options = {"label_selector"}

    for strategy in fallback_strategy:
        if not isinstance(strategy, dict):
            return "Each element in fallback_strategy must be a dictionary."

        if not strategy:
            return "Empty dictionary found in `fallback_strategy`."

        # Validate `fallback_strategy` only contains supported options.
        for option in strategy:
            if option not in supported_options:
                return (
                    f"Unsupported option found: '{option}'. "
                    f"Only {list(supported_options)} is currently supported."
                )

        # Validate the 'label_selector' dictionary.
        label_selector = strategy.get("label_selector")
        if label_selector:
            if not isinstance(label_selector, dict):
                return 'The value of "label_selector" must be a dictionary.'

            error_message = validate_label_selector(label_selector)
            if error_message:
                return error_message

    return None