Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
import json
import re
from typing import (
Any,
Dict,
List,
Optional,
)
import yaml
import ray._private.ray_constants as ray_constants
# Regex patterns used to validate that labels conform to Kubernetes label syntax rules.
# https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#syntax-and-character-set
# Regex for mandatory name (DNS label) or value
# Examples:
# Valid matches: "a", "label-name", "a-._b", "123", "this_is_a_valid_label"
# Invalid matches: "-abc", "abc-", "my@label"
LABEL_REGEX = re.compile(r"([a-zA-Z0-9]([a-zA-Z0-9_.-]{0,61}[a-zA-Z0-9])?)")
# Regex for optional prefix (DNS subdomain)
# Examples:
# Valid matches: "abc", "sub.domain.example", "my-label", "123.456.789"
# Invalid matches: "-abc", "prefix_", "sub..domain", sub.$$.example
LABEL_PREFIX_REGEX = rf"^({LABEL_REGEX.pattern}?(\.{LABEL_REGEX.pattern}?)*)$"
# Supported operators for label selector conditions. Not (!) conditions are handled separately.
LABEL_OPERATORS = {"in"}
# Create a pattern string dynamically based on the LABEL_OPERATORS
OPERATOR_PATTERN = "|".join([re.escape(operator) for operator in LABEL_OPERATORS])
# Regex to match valid label selector operators and values
# Examples:
# Valid matches: "spot", "!GPU", "213521", "in(A123, B456, C789)", "!in(spot, on-demand)", "valid-value"
# Invalid matches: "-spot", "spot_", "in()", "in(spot,", "in(H100, TPU!GPU)", "!!!in(H100, TPU)"
LABEL_SELECTOR_REGEX = re.compile(
rf"^!?(?:{OPERATOR_PATTERN})?\({LABEL_REGEX.pattern}(?:, ?{LABEL_REGEX.pattern})*\)$|^!?{LABEL_REGEX.pattern}$"
)
def parse_node_labels_json(labels_json: str) -> Dict[str, str]:
labels = json.loads(labels_json)
if not isinstance(labels, dict):
raise ValueError("The format after deserialization is not a key-value pair map")
for key, value in labels.items():
if not isinstance(key, str):
raise ValueError("The key is not string type.")
if not isinstance(value, str):
raise ValueError(f'The value of the "{key}" is not string type')
# Validate parsed custom node labels don't begin with ray.io prefix
validate_node_labels(labels)
return labels
def parse_node_labels_string(labels_str: str) -> Dict[str, str]:
labels = {}
# Remove surrounding quotes if they exist
if len(labels_str) > 1 and labels_str.startswith('"') and labels_str.endswith('"'):
labels_str = labels_str[1:-1]
if labels_str == "":
return labels
# Labels argument should consist of a string of key=value pairs
# separated by commas. Labels follow Kubernetes label syntax.
label_pairs = labels_str.split(",")
for pair in label_pairs:
# Split each pair by `=`
key_value = pair.split("=")
if len(key_value) != 2:
raise ValueError("Label string is not a key-value pair.")
key = key_value[0].strip()
value = key_value[1].strip()
labels[key] = value
# Validate parsed node labels follow expected Kubernetes label syntax
validate_node_label_syntax(labels)
return labels
def parse_node_labels_from_yaml_file(path: str) -> Dict[str, str]:
if path == "":
return {}
with open(path, "r") as file:
# Expects valid YAML content
labels = yaml.safe_load(file)
if not isinstance(labels, dict):
raise ValueError(
"The format after deserialization is not a key-value pair map."
)
for key, value in labels.items():
if not isinstance(key, str):
raise ValueError("The key is not string type.")
if not isinstance(value, str):
raise ValueError(f'The value of "{key}" is not string type.')
# Validate parsed node labels follow expected Kubernetes label syntax
validate_node_label_syntax(labels)
return labels
# TODO (ryanaoleary@): This function will be removed after the migration to the label
# selector API from NodeLabelSchedulingPolicy is complete.
def validate_node_labels(labels: Dict[str, str]):
if labels is None:
return
for key in labels.keys():
if key.startswith(ray_constants.RAY_DEFAULT_LABEL_KEYS_PREFIX):
raise ValueError(
f"Custom label keys `{key}` cannot start with the prefix "
f"`{ray_constants.RAY_DEFAULT_LABEL_KEYS_PREFIX}`. "
f"This is reserved for Ray defined labels."
)
def validate_label_key(key: str) -> Optional[str]:
if "/" in key:
prefix, name = key.rsplit("/", 1)
if len(prefix) > 253 or not re.fullmatch(LABEL_PREFIX_REGEX, prefix):
return str(
f"Invalid label key prefix `{prefix}`. Prefix must be a series of DNS labels "
f"separated by dots (.), not longer than 253 characters in total."
)
else:
name = key
if len(name) > 63 or not re.fullmatch(LABEL_REGEX, name):
return str(
f"Invalid label key name `{name}`. Name must be 63 chars or less beginning and ending "
f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
f"dots (.), and alphanumerics between."
)
return None
def validate_label_value(value: str):
if value == "":
return
if len(value) > 63 or not re.fullmatch(LABEL_REGEX, value):
raise ValueError(
f"Invalid label key value `{value}`. Value must be 63 chars or less beginning and ending "
f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
f"dots (.), and alphanumerics between."
)
def validate_label_selector(label_selector: Optional[Dict[str, str]]) -> Optional[str]:
if label_selector is None:
return None
for key, value in label_selector.items():
possible_error_message = validate_label_key(key)
if possible_error_message:
return possible_error_message
if value is not None:
possible_error_message = validate_label_selector_value(value)
if possible_error_message:
return possible_error_message
return None
def validate_label_selector_value(selector: str) -> Optional[str]:
if selector == "":
return None
if not re.fullmatch(LABEL_SELECTOR_REGEX, selector):
return str(
f"Invalid label selector value `{selector}`. The label selector value should contain optional operators and a label value. Supported operators are: ! and {LABEL_OPERATORS}. "
f"Value must be 63 chars or less beginning and ending "
f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
f"dots (.), and alphanumerics between."
)
return None
# TODO (ryanaoleary@): This function will replace `validate_node_labels` after
# the migration from NodeLabelSchedulingPolicy to the Label Selector API is complete.
def validate_node_label_syntax(labels: Dict[str, str]):
if labels is None:
return
for key, value in labels.items():
possible_error_message = validate_label_key(key)
if possible_error_message:
raise ValueError(possible_error_message)
if value is not None:
validate_label_value(value)
def validate_fallback_strategy(
fallback_strategy: Optional[List[Dict[str, Any]]]
) -> Optional[str]:
if fallback_strategy is None:
return None
# Supported options for `fallback_strategy` scheduling.
supported_options = {"label_selector"}
for strategy in fallback_strategy:
if not isinstance(strategy, dict):
return "Each element in fallback_strategy must be a dictionary."
if not strategy:
return "Empty dictionary found in `fallback_strategy`."
# Validate `fallback_strategy` only contains supported options.
for option in strategy:
if option not in supported_options:
return (
f"Unsupported option found: '{option}'. "
f"Only {list(supported_options)} is currently supported."
)
# Validate the 'label_selector' dictionary.
label_selector = strategy.get("label_selector")
if label_selector:
if not isinstance(label_selector, dict):
return 'The value of "label_selector" must be a dictionary.'
error_message = validate_label_selector(label_selector)
if error_message:
return error_message
return None