Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
from typing import List, Optional, Union
from ray._private.client_mode_hook import client_mode_hook
from ray._raylet import GcsClient
_initialized = False
global_gcs_client = None
def _internal_kv_reset():
global global_gcs_client, _initialized
global_gcs_client = None
_initialized = False
def internal_kv_get_gcs_client():
return global_gcs_client
def _initialize_internal_kv(gcs_client: GcsClient):
"""Initialize the internal KV for use in other function calls."""
global global_gcs_client, _initialized
assert gcs_client is not None
global_gcs_client = gcs_client
_initialized = True
@client_mode_hook
def _internal_kv_initialized():
return global_gcs_client is not None
@client_mode_hook
def _internal_kv_get(
key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> bytes:
"""Fetch the value of a binary key."""
if isinstance(key, str):
key = key.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
assert isinstance(key, bytes)
return global_gcs_client.internal_kv_get(key, namespace)
@client_mode_hook
def _internal_kv_exists(
key: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> bool:
"""Check key exists or not."""
if isinstance(key, str):
key = key.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
assert isinstance(key, bytes)
return global_gcs_client.internal_kv_exists(key, namespace)
@client_mode_hook
def _pin_runtime_env_uri(uri: str, *, expiration_s: int) -> None:
"""Pin a runtime_env URI for expiration_s."""
return global_gcs_client.pin_runtime_env_uri(uri, expiration_s)
@client_mode_hook
def _internal_kv_put(
key: Union[str, bytes],
value: Union[str, bytes],
overwrite: bool = True,
*,
namespace: Optional[Union[str, bytes]] = None
) -> bool:
"""Globally associates a value with a given binary key.
This only has an effect if the key does not already have a value.
Returns:
already_exists: whether the value already exists.
"""
if isinstance(key, str):
key = key.encode()
if isinstance(value, str):
value = value.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
assert (
isinstance(key, bytes)
and isinstance(value, bytes)
and isinstance(overwrite, bool)
)
return global_gcs_client.internal_kv_put(key, value, overwrite, namespace) == 0
@client_mode_hook
def _internal_kv_del(
key: Union[str, bytes],
*,
del_by_prefix: bool = False,
namespace: Optional[Union[str, bytes]] = None
) -> int:
if isinstance(key, str):
key = key.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
assert isinstance(key, bytes)
return global_gcs_client.internal_kv_del(key, del_by_prefix, namespace)
@client_mode_hook
def _internal_kv_list(
prefix: Union[str, bytes], *, namespace: Optional[Union[str, bytes]] = None
) -> List[bytes]:
"""List all keys in the internal KV store that start with the prefix."""
if isinstance(prefix, str):
prefix = prefix.encode()
if isinstance(namespace, str):
namespace = namespace.encode()
return global_gcs_client.internal_kv_keys(prefix, namespace)