Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / gcs_utils.py
Size: Mime:
import logging
from typing import Optional

from ray._private import ray_constants
from ray.core.generated.common_pb2 import ErrorType, JobConfig
from ray.core.generated.gcs_pb2 import (
    ActorTableData,
    AvailableResources,
    ErrorTableData,
    GcsEntry,
    GcsNodeInfo,
    JobTableData,
    PlacementGroupTableData,
    PubSubMessage,
    ResourceDemand,
    ResourceLoad,
    ResourcesData,
    ResourceUsageBatchData,
    TablePrefix,
    TablePubsub,
    TaskEvents,
    TotalResources,
    WorkerTableData,
)

logger = logging.getLogger(__name__)

__all__ = [
    "ActorTableData",
    "GcsNodeInfo",
    "AvailableResources",
    "TotalResources",
    "JobTableData",
    "JobConfig",
    "ErrorTableData",
    "ErrorType",
    "GcsEntry",
    "ResourceUsageBatchData",
    "ResourcesData",
    "TablePrefix",
    "TablePubsub",
    "TaskEvents",
    "ResourceDemand",
    "ResourceLoad",
    "PubSubMessage",
    "WorkerTableData",
    "PlacementGroupTableData",
]


WORKER = 0
DRIVER = 1

# Cap messages at 512MB
_MAX_MESSAGE_LENGTH = 512 * 1024 * 1024
# Send keepalive every 60s
_GRPC_KEEPALIVE_TIME_MS = 60 * 1000
# Keepalive should be replied < 60s
_GRPC_KEEPALIVE_TIMEOUT_MS = 60 * 1000

# Also relying on these defaults:
# grpc.keepalive_permit_without_calls=0: No keepalive without inflight calls.
# grpc.use_local_subchannel_pool=0: Subchannels are shared.
_GRPC_OPTIONS = [
    *ray_constants.GLOBAL_GRPC_OPTIONS,
    ("grpc.max_send_message_length", _MAX_MESSAGE_LENGTH),
    ("grpc.max_receive_message_length", _MAX_MESSAGE_LENGTH),
    ("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS),
    ("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS),
]


def create_gcs_channel(address: str, aio=False):
    """Returns a GRPC channel to GCS.

    Args:
        address: GCS address string, e.g. ip:port
        aio: Whether using grpc.aio
    Returns:
        grpc.Channel or grpc.aio.Channel to GCS
    """
    from ray._private.grpc_utils import init_grpc_channel

    return init_grpc_channel(address, options=_GRPC_OPTIONS, asynchronous=aio)


class GcsChannel:
    def __init__(self, gcs_address: Optional[str] = None, aio: bool = False):
        self._gcs_address = gcs_address
        self._aio = aio

    @property
    def address(self):
        return self._gcs_address

    def connect(self):
        # GCS server uses a cached port, so it should use the same port after
        # restarting. This means GCS address should stay the same for the
        # lifetime of the Ray cluster.
        self._channel = create_gcs_channel(self._gcs_address, self._aio)

    def channel(self):
        return self._channel


def cleanup_redis_storage(
    host: str,
    port: int,
    password: str,
    use_ssl: bool,
    storage_namespace: str,
    username: Optional[str] = None,
):
    """This function is used to cleanup the GCS storage in Redis.
    It supports Redis in cluster and non-cluster modes.

    Args:
       host: The Redis host address.
       port: The Redis port.
       username: The Redis username.
       password: The Redis password.
       use_ssl: Whether to encrypt the connection.
       storage_namespace: The namespace of the storage to be deleted.
    """

    from ray._raylet import del_key_prefix_from_storage  # type: ignore

    if not isinstance(host, str):
        raise ValueError("Host must be a string")

    if username is None:
        username = ""

    if not isinstance(username, str):
        raise ValueError("Username must be a string")

    if not isinstance(password, str):
        raise ValueError("Password must be a string")

    if port < 0:
        raise ValueError(f"Invalid port: {port}")

    if not isinstance(use_ssl, bool):
        raise TypeError("use_ssl must be a boolean")

    if not isinstance(storage_namespace, str):
        raise ValueError("storage namespace must be a string")

    # Right now, GCS stores all data in multiple hashes with keys prefixed by
    # storage_namespace. So we only need to delete the specific key prefix to cleanup
    # the cluster's data.
    # Note this deletes all keys with prefix `RAY{key_prefix}@`, not `{key_prefix}`.
    return del_key_prefix_from_storage(
        host, port, username, password, use_ssl, storage_namespace
    )