Repository URL to install this package:
|
Version:
2.0.0rc1 ▾
|
import pytest
import sys
import ray
from ray import serve
from ray._private.test_utils import run_string_as_driver
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_failure_condition(ray_start, tmp_dir):
# Verify that the test conditions fail without passing the working dir.
with open("hello", "w") as f:
f.write("world")
driver = """
import ray
from ray import serve
ray.init(address="auto")
@serve.deployment
class Test:
def __call__(self, *args):
return open("hello").read()
handle = serve.run(Test.bind())
try:
ray.get(handle.remote())
assert False, "Should not get here"
except FileNotFoundError:
pass
"""
run_string_as_driver(driver)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_working_dir_basic(ray_start, tmp_dir, ray_shutdown):
with open("hello", "w") as f:
f.write("world")
print("Wrote file")
ray.init(address="auto", namespace="serve", runtime_env={"working_dir": "."})
print("Initialized Ray")
@serve.deployment
class Test:
def __call__(self, *args):
return open("hello").read()
handle = serve.run(Test.bind())
print("Deployed")
assert ray.get(handle.remote()) == "world"
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_working_dir_connect_from_new_driver(ray_start, tmp_dir):
with open("hello", "w") as f:
f.write("world")
driver1 = """
import ray
from ray import serve
job_config = ray.job_config.JobConfig(runtime_env={"working_dir": "."})
ray.init(address="auto", namespace="serve", job_config=job_config)
@serve.deployment
class Test:
def __call__(self, *args):
return open("hello").read()
handle = serve.run(Test.bind())
assert ray.get(handle.remote()) == "world"
"""
run_string_as_driver(driver1)
driver2 = """
import ray
from ray import serve
job_config = ray.job_config.JobConfig(runtime_env={"working_dir": "."})
ray.init(address="auto", namespace="serve", job_config=job_config)
Test = serve.get_deployment("Test")
handle = serve.run(Test.bind())
assert ray.get(handle.remote()) == "world"
Test.delete()
"""
run_string_as_driver(driver2)
@pytest.mark.skipif(sys.platform == "win32", reason="Fail to create temp dir.")
def test_working_dir_scale_up_in_new_driver(ray_start, tmp_dir):
with open("hello", "w") as f:
f.write("world")
driver1 = """
import os
import ray
from ray import serve
job_config = ray.job_config.JobConfig(runtime_env={"working_dir": "."})
ray.init(address="auto", namespace="serve", job_config=job_config)
@serve.deployment(version="1")
class Test:
def __call__(self, *args):
return os.getpid(), open("hello").read()
handle = serve.run(Test.bind())
assert ray.get(handle.remote())[1] == "world"
"""
run_string_as_driver(driver1)
with open("hello", "w") as f:
f.write("no longer world")
driver2 = """
import ray
from ray import serve
job_config = ray.job_config.JobConfig(runtime_env={"working_dir": "."})
ray.init(address="auto", namespace="serve", job_config=job_config)
Test = serve.get_deployment("Test")
handle = serve.run(Test.options(num_replicas=2).bind())
results = ray.get([handle.remote() for _ in range(1000)])
print(set(results))
assert all(r[1] == "world" for r in results), (
"results should still come from the first env")
assert len(set(r[0] for r in results)) == 2, (
"make sure there are two replicas")
Test.delete()
"""
run_string_as_driver(driver2)
if __name__ == "__main__":
import sys
sys.exit(pytest.main(["-sv", __file__]))