Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / serve / tests / test_util.py
Size: Mime:
import json
import os
import subprocess
import sys
import tempfile

import numpy as np
import pytest
from fastapi.encoders import jsonable_encoder

import ray
from ray import serve
from ray.serve._private.utils import (
    get_deployment_import_path,
    override_runtime_envs_except_env_vars,
    serve_encoders,
)


def test_bytes_encoder():
    data_before = {"inp": {"nest": b"bytes"}}
    data_after = {"inp": {"nest": "bytes"}}
    assert json.loads(json.dumps(jsonable_encoder(data_before))) == data_after


def test_numpy_encoding():
    data = [1, 2]
    floats = np.array(data).astype(np.float32)
    ints = floats.astype(np.int32)
    uints = floats.astype(np.uint32)
    list_of_uints = [np.int64(1), np.int64(2)]

    for np_data in [floats, ints, uints, list_of_uints]:
        assert (
            json.loads(
                json.dumps(jsonable_encoder(np_data, custom_encoder=serve_encoders))
            )
            == data
        )
    nested = {"a": np.array([1, 2])}
    assert json.loads(
        json.dumps(jsonable_encoder(nested, custom_encoder=serve_encoders))
    ) == {"a": [1, 2]}


@serve.deployment
def decorated_f(*args):
    return "reached decorated_f"


@ray.remote
class DecoratedActor:
    def __call__(self, *args):
        return "reached decorated_actor"


def gen_func():
    @serve.deployment
    def f():
        pass

    return f


def gen_class():
    @serve.deployment
    class A:
        pass

    return A


class TestGetDeploymentImportPath:
    def test_invalid_inline_defined(self):
        @serve.deployment
        def inline_f():
            pass

        with pytest.raises(RuntimeError, match="must be importable"):
            get_deployment_import_path(inline_f, enforce_importable=True)

        with pytest.raises(RuntimeError, match="must be importable"):
            get_deployment_import_path(gen_func(), enforce_importable=True)

        @serve.deployment
        class InlineCls:
            pass

        with pytest.raises(RuntimeError, match="must be importable"):
            get_deployment_import_path(InlineCls, enforce_importable=True)

        with pytest.raises(RuntimeError, match="must be importable"):
            get_deployment_import_path(gen_class(), enforce_importable=True)

    def test_get_import_path_basic(self):
        d = decorated_f.options()

        # CI may change the parent path, so check only that the suffix matches.
        assert get_deployment_import_path(d).endswith(
            "ray.serve.tests.test_util.decorated_f"
        )

    def test_get_import_path_nested_actor(self):
        d = serve.deployment(name="actor")(DecoratedActor)

        # CI may change the parent path, so check only that the suffix matches.
        assert get_deployment_import_path(d).endswith(
            "ray.serve.tests.test_util.DecoratedActor"
        )

    @pytest.mark.skipif(
        sys.platform == "win32", reason="File path incorrect on Windows."
    )
    def test_replace_main(self):

        temp_fname = "testcase.py"
        expected_import_path = "testcase.main_f"

        code = (
            "from ray import serve\n"
            "from ray.serve._private.utils import get_deployment_import_path\n"
            "@serve.deployment\n"
            "def main_f(*args):\n"
            "\treturn 'reached main_f'\n"
            "assert get_deployment_import_path(main_f, replace_main=True) == "
            f"'{expected_import_path}'"
        )

        with tempfile.TemporaryDirectory() as dirpath:
            full_fname = os.path.join(dirpath, temp_fname)

            with open(full_fname, "w+") as f:
                f.write(code)

            subprocess.check_output(["python", full_fname])


class TestOverrideRuntimeEnvsExceptEnvVars:
    def test_merge_empty(self):
        assert {"env_vars": {}} == override_runtime_envs_except_env_vars({}, {})

    def test_merge_empty_parent(self):
        child = {"env_vars": {"test1": "test_val"}, "working_dir": "."}
        assert child == override_runtime_envs_except_env_vars({}, child)

    def test_merge_empty_child(self):
        parent = {"env_vars": {"test1": "test_val"}, "working_dir": "."}
        assert parent == override_runtime_envs_except_env_vars(parent, {})

    @pytest.mark.parametrize("invalid_env", [None, 0, "runtime_env", set()])
    def test_invalid_type(self, invalid_env):
        with pytest.raises(TypeError):
            override_runtime_envs_except_env_vars(invalid_env, {})
        with pytest.raises(TypeError):
            override_runtime_envs_except_env_vars({}, invalid_env)
        with pytest.raises(TypeError):
            override_runtime_envs_except_env_vars(invalid_env, invalid_env)

    def test_basic_merge(self):
        parent = {
            "py_modules": ["http://test.com/test0.zip", "s3://path/test1.zip"],
            "working_dir": "gs://path/test2.zip",
            "env_vars": {"test": "val", "trial": "val2"},
            "pip": ["pandas", "numpy"],
            "excludes": ["my_file.txt"],
        }
        original_parent = parent.copy()

        child = {
            "py_modules": [],
            "working_dir": "s3://path/test1.zip",
            "env_vars": {"test": "val", "trial": "val2"},
            "pip": ["numpy"],
        }
        original_child = child.copy()

        merged = override_runtime_envs_except_env_vars(parent, child)

        assert original_parent == parent
        assert original_child == child
        assert merged == {
            "py_modules": [],
            "working_dir": "s3://path/test1.zip",
            "env_vars": {"test": "val", "trial": "val2"},
            "pip": ["numpy"],
            "excludes": ["my_file.txt"],
        }

    def test_merge_deep_copy(self):
        """Check that the env values are actually deep-copied."""

        parent_env_vars = {"parent": "pval"}
        child_env_vars = {"child": "cval"}

        parent = {"env_vars": parent_env_vars}
        child = {"env_vars": child_env_vars}
        original_parent = parent.copy()
        original_child = child.copy()

        merged = override_runtime_envs_except_env_vars(parent, child)
        assert merged["env_vars"] == {"parent": "pval", "child": "cval"}
        assert original_parent == parent
        assert original_child == child

    def test_merge_empty_env_vars(self):
        env_vars = {"test": "val", "trial": "val2"}

        non_empty = {"env_vars": {"test": "val", "trial": "val2"}}
        empty = {}

        assert (
            env_vars
            == override_runtime_envs_except_env_vars(non_empty, empty)["env_vars"]
        )
        assert (
            env_vars
            == override_runtime_envs_except_env_vars(empty, non_empty)["env_vars"]
        )
        assert {} == override_runtime_envs_except_env_vars(empty, empty)["env_vars"]

    def test_merge_env_vars(self):
        parent = {
            "py_modules": ["http://test.com/test0.zip", "s3://path/test1.zip"],
            "working_dir": "gs://path/test2.zip",
            "env_vars": {"parent": "pval", "override": "old"},
            "pip": ["pandas", "numpy"],
            "excludes": ["my_file.txt"],
        }

        child = {
            "py_modules": [],
            "working_dir": "s3://path/test1.zip",
            "env_vars": {"child": "cval", "override": "new"},
            "pip": ["numpy"],
        }

        merged = override_runtime_envs_except_env_vars(parent, child)
        assert merged == {
            "py_modules": [],
            "working_dir": "s3://path/test1.zip",
            "env_vars": {"parent": "pval", "child": "cval", "override": "new"},
            "pip": ["numpy"],
            "excludes": ["my_file.txt"],
        }

    def test_inheritance_regression(self):
        """Check if the general Ray runtime_env inheritance behavior matches.

        override_runtime_envs_except_env_vars should match the general Ray
        runtime_env inheritance behavior. This test checks if that behavior
        has changed, which would indicate a regression in
        override_runtime_envs_except_env_vars. If the runtime_env inheritance
        behavior changes, override_runtime_envs_except_env_vars should also
        change to match.
        """

        with ray.init(
            runtime_env={
                "py_modules": [
                    "https://github.com/ray-project/test_dag/archive/"
                    "40d61c141b9c37853a7014b8659fc7f23c1d04f6.zip"
                ],
                "env_vars": {"var1": "hello"},
            }
        ):

            @ray.remote
            def check_module():
                # Check that Ray job's py_module loaded correctly
                from conditional_dag import serve_dag  # noqa: F401

                return os.getenv("var1")

            assert ray.get(check_module.remote()) == "hello"

            @ray.remote(
                runtime_env={
                    "py_modules": [
                        "https://github.com/ray-project/test_deploy_group/archive/"
                        "67971777e225600720f91f618cdfe71fc47f60ee.zip"
                    ],
                    "env_vars": {"var2": "world"},
                }
            )
            def test_task():
                with pytest.raises(ImportError):
                    # Check that Ray job's py_module was overwritten
                    from conditional_dag import serve_dag  # noqa: F401

                from test_env.shallow_import import ShallowClass

                if ShallowClass()() == "Hello shallow world!":
                    return os.getenv("var1") + " " + os.getenv("var2")

            assert ray.get(test_task.remote()) == "hello world"


if __name__ == "__main__":
    import sys

    sys.exit(pytest.main(["-v", "-s", __file__]))