Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
"""This is the script for `ray microbenchmark`."""
import asyncio
import logging
import multiprocessing
import numpy as np
import ray
from ray._private.ray_client_microbenchmark import main as client_microbenchmark_main
from ray._private.ray_microbenchmark_helpers import timeit
logger = logging.getLogger(__name__)
@ray.remote(num_cpus=0)
class Actor:
def small_value(self):
return b"ok"
def small_value_arg(self, x):
return b"ok"
def small_value_batch(self, n):
ray.get([small_value.remote() for _ in range(n)])
@ray.remote
class AsyncActor:
async def small_value(self):
return b"ok"
async def small_value_with_arg(self, x):
return b"ok"
async def small_value_batch(self, n):
await asyncio.wait([small_value.remote() for _ in range(n)])
@ray.remote(num_cpus=0)
class Client:
def __init__(self, servers):
if not isinstance(servers, list):
servers = [servers]
self.servers = servers
def small_value_batch(self, n):
results = []
for s in self.servers:
results.extend([s.small_value.remote() for _ in range(n)])
ray.get(results)
def small_value_batch_arg(self, n):
x = ray.put(0)
results = []
for s in self.servers:
results.extend([s.small_value_arg.remote(x) for _ in range(n)])
ray.get(results)
@ray.remote
def small_value():
return b"ok"
@ray.remote
def small_value_batch(n):
submitted = [small_value.remote() for _ in range(n)]
ray.get(submitted)
return 0
@ray.remote
def create_object_containing_ref():
obj_refs = []
for _ in range(10000):
obj_refs.append(ray.put(1))
return obj_refs
def check_optimized_build():
if not ray._raylet.OPTIMIZED:
msg = (
"WARNING: Unoptimized build! "
"To benchmark an optimized build, try:\n"
"\tbazel run -c opt //:gen_ray_pkg\n"
"You can also make this permanent by adding\n"
"\tbuild --compilation_mode=opt\n"
"to your user-wide ~/.bazelrc file. "
"(Do not add this to the project-level .bazelrc file.)"
)
logger.warning(msg)
def main(results=None):
results = results or []
check_optimized_build()
print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")
ray.init()
value = ray.put(0)
def get_small():
ray.get(value)
def put_small():
ray.put(0)
@ray.remote
def do_put_small():
for _ in range(100):
ray.put(0)
def put_multi_small():
ray.get([do_put_small.remote() for _ in range(10)])
arr = np.zeros(100 * 1024 * 1024, dtype=np.int64)
results += timeit("single client get calls (Plasma Store)", get_small)
results += timeit("single client put calls (Plasma Store)", put_small)
results += timeit("multi client put calls (Plasma Store)", put_multi_small, 1000)
def put_large():
ray.put(arr)
results += timeit("single client put gigabytes", put_large, 8 * 0.1)
def small_value_batch():
submitted = [small_value.remote() for _ in range(1000)]
ray.get(submitted)
return 0
results += timeit("single client tasks and get batch", small_value_batch)
@ray.remote
def do_put():
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024, dtype=np.int64))
def put_multi():
ray.get([do_put.remote() for _ in range(10)])
results += timeit("multi client put gigabytes", put_multi, 10 * 8 * 0.1)
obj_containing_ref = create_object_containing_ref.remote()
def get_containing_object_ref():
ray.get(obj_containing_ref)
results += timeit(
"single client get object containing 10k refs", get_containing_object_ref
)
def wait_multiple_refs():
num_objs = 1000
not_ready = [small_value.remote() for _ in range(num_objs)]
# We only need to trigger the fetch_local once for each object,
# raylet will persist these fetch requests even after ray.wait returns.
# See https://github.com/ray-project/ray/issues/30375.
fetch_local = True
for _ in range(num_objs):
_ready, not_ready = ray.wait(not_ready, fetch_local=fetch_local)
if fetch_local:
fetch_local = False
results += timeit("single client wait 1k refs", wait_multiple_refs)
def small_task():
ray.get(small_value.remote())
results += timeit("single client tasks sync", small_task)
def small_task_async():
ray.get([small_value.remote() for _ in range(1000)])
results += timeit("single client tasks async", small_task_async, 1000)
n = 10000
m = 4
actors = [Actor.remote() for _ in range(m)]
def multi_task():
submitted = [a.small_value_batch.remote(n) for a in actors]
ray.get(submitted)
results += timeit("multi client tasks async", multi_task, n * m)
a = Actor.remote()
def actor_sync():
ray.get(a.small_value.remote())
results += timeit("1:1 actor calls sync", actor_sync)
a = Actor.remote()
def actor_async():
ray.get([a.small_value.remote() for _ in range(1000)])
results += timeit("1:1 actor calls async", actor_async, 1000)
a = Actor.options(max_concurrency=16).remote()
def actor_concurrent():
ray.get([a.small_value.remote() for _ in range(1000)])
results += timeit("1:1 actor calls concurrent", actor_concurrent, 1000)
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [Actor._remote() for _ in range(n_cpu)]
client = Client.remote(actors)
def actor_async_direct():
ray.get(client.small_value_batch.remote(n))
results += timeit("1:n actor calls async", actor_async_direct, n * len(actors))
n_cpu = multiprocessing.cpu_count() // 2
a = [Actor.remote() for _ in range(n_cpu)]
@ray.remote
def work(actors):
ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)])
def actor_multi2():
ray.get([work.remote(a) for _ in range(m)])
results += timeit("n:n actor calls async", actor_multi2, m * n)
n = 1000
actors = [Actor._remote() for _ in range(n_cpu)]
clients = [Client.remote(a) for a in actors]
def actor_multi2_direct_arg():
ray.get([c.small_value_batch_arg.remote(n) for c in clients])
results += timeit(
"n:n actor calls with arg async", actor_multi2_direct_arg, n * len(clients)
)
a = AsyncActor.remote()
def actor_sync():
ray.get(a.small_value.remote())
results += timeit("1:1 async-actor calls sync", actor_sync)
a = AsyncActor.remote()
def async_actor():
ray.get([a.small_value.remote() for _ in range(1000)])
results += timeit("1:1 async-actor calls async", async_actor, 1000)
a = AsyncActor.remote()
def async_actor():
ray.get([a.small_value_with_arg.remote(i) for i in range(1000)])
results += timeit("1:1 async-actor calls with args async", async_actor, 1000)
n = 5000
n_cpu = multiprocessing.cpu_count() // 2
actors = [AsyncActor.remote() for _ in range(n_cpu)]
client = Client.remote(actors)
def async_actor_async():
ray.get(client.small_value_batch.remote(n))
results += timeit("1:n async-actor calls async", async_actor_async, n * len(actors))
n = 5000
m = 4
n_cpu = multiprocessing.cpu_count() // 2
a = [AsyncActor.remote() for _ in range(n_cpu)]
@ray.remote
def async_actor_work(actors):
ray.get([actors[i % n_cpu].small_value.remote() for i in range(n)])
def async_actor_multi():
ray.get([async_actor_work.remote(a) for _ in range(m)])
results += timeit("n:n async-actor calls async", async_actor_multi, m * n)
ray.shutdown()
############################
# End of channel perf tests.
############################
NUM_PGS = 100
NUM_BUNDLES = 1
ray.init(resources={"custom": 100})
def placement_group_create_removal(num_pgs):
pgs = [
ray.util.placement_group(
bundles=[{"custom": 0.001} for _ in range(NUM_BUNDLES)]
)
for _ in range(num_pgs)
]
[pg.wait(timeout_seconds=30) for pg in pgs]
# Include placement group removal here to clean up.
# If we don't clean up placement groups, the whole performance
# gets slower as it runs more.
# Since timeit function runs multiple times without
# the cleaning logic, we should have this method here.
for pg in pgs:
ray.util.remove_placement_group(pg)
results += timeit(
"placement group create/removal",
lambda: placement_group_create_removal(NUM_PGS),
NUM_PGS,
)
ray.shutdown()
client_microbenchmark_main(results)
return results
if __name__ == "__main__":
main()