Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / serve / _private / json_serde.py
Size: Mime:
from typing import Any, Union
from importlib import import_module

import json

from ray.dag import (
    DAGNode,
    ClassNode,
    FunctionNode,
    InputNode,
    InputAttributeNode,
    DAGNODE_TYPE_KEY,
)
from ray.serve._private.deployment_executor_node import DeploymentExecutorNode
from ray.serve._private.deployment_method_executor_node import (
    DeploymentMethodExecutorNode,
)
from ray.serve._private.deployment_function_executor_node import (
    DeploymentFunctionExecutorNode,
)

from ray.serve.schema import (
    DeploymentSchema,
)
from ray.serve._private.utils import parse_import_path
from ray.serve.handle import (
    HandleOptions,
    RayServeHandle,
    RayServeDeploymentHandle,
    _serve_handle_to_json_dict,
    _serve_handle_from_json_dict,
)
from ray.serve._private.constants import SERVE_HANDLE_JSON_KEY
from ray.serve.deployment_graph import RayServeDAGHandle


def convert_to_json_safe_obj(obj: Any, *, err_key: str) -> Any:
    """Converts the provided object into a JSON-safe version of it.

    The returned object can safely be `json.dumps`'d to a string.

    Uses the Ray Serve encoder to serialize special objects such as
    ServeHandles and DAGHandles.

    Raises: TypeError if the object contains fields that cannot be
    JSON-serialized.
    """
    try:
        return json.loads(json.dumps(obj, cls=DAGNodeEncoder))
    except Exception as e:
        raise TypeError(
            "All provided fields must be JSON-serializable to build the "
            f"Serve app. Failed while serializing {err_key}:\n{e}"
        )


def convert_from_json_safe_obj(obj: Any, *, err_key: str) -> Any:
    """Converts a JSON-safe object to one that contains Serve special types.

    The provided object should have been serialized using
    convert_to_json_safe_obj. Any special-cased objects such as ServeHandles
    will be recovered on this pass.
    """
    try:
        return json.loads(json.dumps(obj), object_hook=dagnode_from_json)
    except Exception as e:
        raise ValueError(f"Failed to convert {err_key} from JSON:\n{e}")


class DAGNodeEncoder(json.JSONEncoder):
    """
    Custom JSON serializer for DAGNode type that takes care of RayServeHandle
    used in deployment init args or kwargs, as well as all other DAGNode types
    with potentially deeply nested structure with other DAGNode instances.

    Enforcements:
        - All args, kwargs and other_args_to_resolve used in Ray DAG needs to
            be JSON serializable in order to be converted and deployed using
            Ray Serve.
        - All modules such as class or functions need to be visible and
            importable on top of its file, and can be resolved via a fully
            qualified import_path.
        - No DAGNode instance should appear in bound .options(), which should be
            JSON serializable with default encoder.
    """

    def default(self, obj):
        if isinstance(obj, DeploymentSchema):
            return {
                DAGNODE_TYPE_KEY: "DeploymentSchema",
                "schema": obj.dict(),
            }
        elif isinstance(obj, RayServeHandle):
            return _serve_handle_to_json_dict(obj)
        elif isinstance(obj, RayServeDAGHandle):
            # TODO(simon) Do a proper encoder
            return {
                DAGNODE_TYPE_KEY: RayServeDAGHandle.__name__,
                "dag_node_json": obj.dag_node_json,
            }
        elif isinstance(obj, RayServeDeploymentHandle):
            return {
                DAGNODE_TYPE_KEY: RayServeDeploymentHandle.__name__,
                "deployment_name": obj.deployment_name,
                "handle_options_method_name": obj.handle_options.method_name,
            }
        # For all other DAGNode types.
        elif isinstance(obj, DAGNode):
            return obj.to_json()
        else:
            return json.JSONEncoder.default(self, obj)


def dagnode_from_json(input_json: Any) -> Union[DAGNode, RayServeHandle, Any]:
    """
    Decode a DAGNode from given input json dictionary. JSON serialization is
    only used and enforced in ray serve from ray core API authored DAGNode(s).

    Covers both RayServeHandle and DAGNode types.

    Assumptions:
        - User object's JSON dict does not have keys that collide with our
            reserved DAGNODE_TYPE_KEY
        - RayServeHandle and Deployment can be re-constructed without losing
            states needed for their functionality or correctness.
        - DAGNode type can be re-constructed with new stable_uuid upon each
            deserialization without effective correctness of execution.
            - Only exception is ClassNode used as parent of ClassMethodNode
                that we perserve the same parent node.
        - .options() does not contain any DAGNode type
    """
    node_type_to_cls = {
        # Ray DAG Inputs
        InputNode.__name__: InputNode,
        InputAttributeNode.__name__: InputAttributeNode,
        # Deployment graph execution nodes
        DeploymentExecutorNode.__name__: DeploymentExecutorNode,
        DeploymentMethodExecutorNode.__name__: DeploymentMethodExecutorNode,
        DeploymentFunctionExecutorNode.__name__: DeploymentFunctionExecutorNode,
    }
    # Deserialize RayServeHandle type
    if SERVE_HANDLE_JSON_KEY in input_json:
        return _serve_handle_from_json_dict(input_json)
    # Base case for plain objects
    elif DAGNODE_TYPE_KEY not in input_json:
        return input_json
    elif input_json[DAGNODE_TYPE_KEY] == RayServeDAGHandle.__name__:
        return RayServeDAGHandle(input_json["dag_node_json"])
    elif input_json[DAGNODE_TYPE_KEY] == "DeploymentSchema":
        return DeploymentSchema.parse_obj(input_json["schema"])
    elif input_json[DAGNODE_TYPE_KEY] == RayServeDeploymentHandle.__name__:
        return RayServeDeploymentHandle(
            input_json["deployment_name"],
            HandleOptions(input_json["handle_options_method_name"]),
        )
    # Deserialize DAGNode type
    elif input_json[DAGNODE_TYPE_KEY] in node_type_to_cls:
        return node_type_to_cls[input_json[DAGNODE_TYPE_KEY]].from_json(input_json)
    else:
        # Class and Function nodes require original module as body.
        module_name, attr_name = parse_import_path(input_json["import_path"])
        module = getattr(import_module(module_name), attr_name)
        if input_json[DAGNODE_TYPE_KEY] == FunctionNode.__name__:
            return FunctionNode.from_json(input_json, module)
        elif input_json[DAGNODE_TYPE_KEY] == ClassNode.__name__:
            return ClassNode.from_json(input_json, module)