Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / experimental / internal_kv.py
Size: Mime:
from typing import List, Optional, Union

from ray._private.client_mode_hook import client_mode_hook
from ray._raylet import GcsClient

_initialized = False
global_gcs_client = None


def _internal_kv_reset():
    global global_gcs_client, _initialized
    global_gcs_client = None
    _initialized = False


def internal_kv_get_gcs_client():
    return global_gcs_client


def _initialize_internal_kv(gcs_client: GcsClient):
    """Initialize the internal KV for use in other function calls."""
    global global_gcs_client, _initialized
    assert gcs_client is not None
    global_gcs_client = gcs_client
    _initialized = True


@client_mode_hook
def _internal_kv_initialized():
    return global_gcs_client is not None


@client_mode_hook
def _internal_kv_get(
    key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> bytes:
    """Fetch the value of a binary key."""

    if isinstance(key, str):
        key = key.encode()
    if isinstance(namespace, str):
        namespace = namespace.encode()
    assert isinstance(key, bytes)
    return global_gcs_client.internal_kv_get(key, namespace)


@client_mode_hook
def _internal_kv_exists(
    key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> bool:
    """Check key exists or not."""

    if isinstance(key, str):
        key = key.encode()
    if isinstance(namespace, str):
        namespace = namespace.encode()
    assert isinstance(key, bytes)
    return global_gcs_client.internal_kv_exists(key, namespace)


@client_mode_hook
def _pin_runtime_env_uri(uri: str, *, expiration_s: int) -> None:
    """Pin a runtime_env URI for expiration_s."""
    return global_gcs_client.pin_runtime_env_uri(uri, expiration_s)


@client_mode_hook
def _internal_kv_put(
    key: Union[str, bytes],
    value: Union[str, bytes],
    overwrite: bool = True,
    *,
    namespace: Optional[Union[str, bytes]] = None
) -> bool:
    """Globally associates a value with a given binary key.

    This only has an effect if the key does not already have a value.

    Returns:
        already_exists: whether the value already exists.
    """

    if isinstance(key, str):
        key = key.encode()
    if isinstance(value, str):
        value = value.encode()
    if isinstance(namespace, str):
        namespace = namespace.encode()
    assert (
        isinstance(key, bytes)
        and isinstance(value, bytes)
        and isinstance(overwrite, bool)
    )
    return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0


@client_mode_hook
def _internal_kv_del(
    key: Union[str, bytes],
    *,
    del_by_prefix: bool = False,
    namespace: Optional[Union[str, bytes]] = None
) -> int:
    if isinstance(key, str):
        key = key.encode()
    if isinstance(namespace, str):
        namespace = namespace.encode()
    assert isinstance(key, bytes)
    return global_gcs_client.internal_kv_del(key, del_by_prefix, namespace)


@client_mode_hook
def _internal_kv_list(
    prefix: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> List[bytes]:
    """List all keys in the internal KV store that start with the prefix."""
    if isinstance(prefix, str):
        prefix = prefix.encode()
    if isinstance(namespace, str):
        namespace = namespace.encode()
    return global_gcs_client.internal_kv_keys(prefix, namespace)