Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / _private / ray_experimental_perf.py
Size: Mime:
"""This is the script for `ray microbenchmark`."""

import asyncio
import logging
import multiprocessing

import ray
import ray.experimental.channel as ray_channel
from ray._common.utils import (
    get_or_create_event_loop,
)
from ray._private.ray_microbenchmark_helpers import asyncio_timeit, timeit
from ray._private.test_utils import get_actor_node_id
from ray.dag import InputNode, MultiOutputNode
from ray.dag.compiled_dag_node import CompiledDAG
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy

logger = logging.getLogger(__name__)


@ray.remote
class DAGActor:
    def echo(self, x):
        return x

    def echo_multiple(self, *x):
        return x


def check_optimized_build():
    if not ray._raylet.OPTIMIZED:
        msg = (
            "WARNING: Unoptimized build! "
            "To benchmark an optimized build, try:\n"
            "\tbazel run -c opt //:gen_ray_pkg\n"
            "You can also make this permanent by adding\n"
            "\tbuild --compilation_mode=opt\n"
            "to your user-wide ~/.bazelrc file. "
            "(Do not add this to the project-level .bazelrc file.)"
        )
        logger.warning(msg)


def create_driver_actor():
    return CompiledDAG.DAGDriverProxyActor.options(
        scheduling_strategy=NodeAffinitySchedulingStrategy(
            ray.get_runtime_context().get_node_id(), soft=False
        )
    ).remote()


def main(results=None):
    results = results or []
    loop = get_or_create_event_loop()

    check_optimized_build()

    print("Tip: set TESTS_TO_RUN='pattern' to run a subset of benchmarks")

    #################################################
    # Perf tests for channels, used in compiled DAGs.
    #################################################
    ray.init()

    def put_channel_small(chans, do_get=False):
        for chan in chans:
            chan.write(b"0")
            if do_get:
                chan.read()

    @ray.remote
    class ChannelReader:
        def ready(self):
            return

        def read(self, chans):
            while True:
                for chan in chans:
                    chan.read()

    driver_actor = create_driver_actor()
    driver_node = get_actor_node_id(driver_actor)
    chans = [ray_channel.Channel(None, [(driver_actor, driver_node)], 1000)]
    results += timeit(
        "[unstable] local put:local get, single channel calls",
        lambda: put_channel_small(chans, do_get=True),
    )

    reader = ChannelReader.remote()
    reader_node = get_actor_node_id(reader)
    chans = [ray_channel.Channel(None, [(reader, reader_node)], 1000)]
    ray.get(reader.ready.remote())
    reader.read.remote(chans)
    results += timeit(
        "[unstable] local put:1 remote get, single channel calls",
        lambda: put_channel_small(chans),
    )
    ray.kill(reader)

    n_cpu = multiprocessing.cpu_count() // 2
    print(f"Testing multiple readers/channels, n={n_cpu}")

    reader_and_node_list = []
    for _ in range(n_cpu):
        reader = ChannelReader.remote()
        reader_node = get_actor_node_id(reader)
        reader_and_node_list.append((reader, reader_node))
    chans = [ray_channel.Channel(None, reader_and_node_list, 1000)]
    ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
    for reader, _ in reader_and_node_list:
        reader.read.remote(chans)
    results += timeit(
        "[unstable] local put:n remote get, single channel calls",
        lambda: put_channel_small(chans),
    )
    for reader, _ in reader_and_node_list:
        ray.kill(reader)

    reader = ChannelReader.remote()
    reader_node = get_actor_node_id(reader)
    chans = [
        ray_channel.Channel(None, [(reader, reader_node)], 1000) for _ in range(n_cpu)
    ]
    ray.get(reader.ready.remote())
    reader.read.remote(chans)
    results += timeit(
        "[unstable] local put:1 remote get, n channels calls",
        lambda: put_channel_small(chans),
    )
    ray.kill(reader)

    reader_and_node_list = []
    for _ in range(n_cpu):
        reader = ChannelReader.remote()
        reader_node = get_actor_node_id(reader)
        reader_and_node_list.append((reader, reader_node))
    chans = [
        ray_channel.Channel(None, [reader_and_node_list[i]], 1000) for i in range(n_cpu)
    ]
    ray.get([reader.ready.remote() for reader, _ in reader_and_node_list])
    for chan, reader_node_tuple in zip(chans, reader_and_node_list):
        reader = reader_node_tuple[0]
        reader.read.remote([chan])
    results += timeit(
        "[unstable] local put:n remote get, n channels calls",
        lambda: put_channel_small(chans),
    )
    for reader, _ in reader_and_node_list:
        ray.kill(reader)

    # Tests for compiled DAGs.

    def _exec(dag, num_args=1, payload_size=1):
        output_ref = dag.execute(*[b"x" * payload_size for _ in range(num_args)])
        ray.get(output_ref)

    async def exec_async(tag):
        async def _exec_async():
            fut = await compiled_dag.execute_async(b"x")
            if not isinstance(fut, list):
                await fut
            else:
                await asyncio.gather(*fut)

        return await asyncio_timeit(
            tag,
            _exec_async,
        )

    # Single-actor DAG calls

    a = DAGActor.remote()
    with InputNode() as inp:
        dag = a.echo.bind(inp)

    results += timeit(
        "[unstable] single-actor DAG calls", lambda: ray.get(dag.execute(b"x"))
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        "[unstable] compiled single-actor DAG calls", lambda: _exec(compiled_dag)
    )
    del a

    # Single-actor asyncio DAG calls

    a = DAGActor.remote()
    with InputNode() as inp:
        dag = a.echo.bind(inp)
    compiled_dag = dag.experimental_compile(enable_asyncio=True)
    results += loop.run_until_complete(
        exec_async(
            "[unstable] compiled single-actor asyncio DAG calls",
        )
    )
    del a

    # Scatter-gather DAG calls

    n_cpu = multiprocessing.cpu_count() // 2
    actors = [DAGActor.remote() for _ in range(n_cpu)]
    with InputNode() as inp:
        dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
    results += timeit(
        f"[unstable] scatter-gather DAG calls, n={n_cpu} actors",
        lambda: ray.get(dag.execute(b"x")),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        f"[unstable] compiled scatter-gather DAG calls, n={n_cpu} actors",
        lambda: _exec(compiled_dag),
    )

    # Scatter-gather asyncio DAG calls

    actors = [DAGActor.remote() for _ in range(n_cpu)]
    with InputNode() as inp:
        dag = MultiOutputNode([a.echo.bind(inp) for a in actors])
    compiled_dag = dag.experimental_compile(enable_asyncio=True)
    results += loop.run_until_complete(
        exec_async(
            f"[unstable] compiled scatter-gather asyncio DAG calls, n={n_cpu} actors",
        )
    )

    # Chain DAG calls

    actors = [DAGActor.remote() for _ in range(n_cpu)]
    with InputNode() as inp:
        dag = inp
        for a in actors:
            dag = a.echo.bind(dag)
    results += timeit(
        f"[unstable] chain DAG calls, n={n_cpu} actors",
        lambda: ray.get(dag.execute(b"x")),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        f"[unstable] compiled chain DAG calls, n={n_cpu} actors",
        lambda: _exec(compiled_dag),
    )

    # Chain asyncio DAG calls

    actors = [DAGActor.remote() for _ in range(n_cpu)]
    with InputNode() as inp:
        dag = inp
        for a in actors:
            dag = a.echo.bind(dag)
    compiled_dag = dag.experimental_compile(enable_asyncio=True)
    results += loop.run_until_complete(
        exec_async(f"[unstable] compiled chain asyncio DAG calls, n={n_cpu} actors")
    )

    # Multiple args with small payloads

    n_actors = 8
    assert (
        n_cpu > n_actors
    ), f"n_cpu ({n_cpu}) must be greater than n_actors ({n_actors})"

    actors = [DAGActor.remote() for _ in range(n_actors)]
    with InputNode() as inp:
        dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
    payload_size = 1
    results += timeit(
        f"[unstable] multiple args with small payloads DAG calls, n={n_actors} actors",
        lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        f"[unstable] compiled multiple args with small payloads DAG calls, "
        f"n={n_actors} actors",
        lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
    )

    # Multiple args with medium payloads

    actors = [DAGActor.remote() for _ in range(n_actors)]
    with InputNode() as inp:
        dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
    payload_size = 1024 * 1024
    results += timeit(
        f"[unstable] multiple args with medium payloads DAG calls, n={n_actors} actors",
        lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        "[unstable] compiled multiple args with medium payloads DAG calls, "
        f"n={n_actors} actors",
        lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
    )

    # Multiple args with large payloads

    actors = [DAGActor.remote() for _ in range(n_actors)]
    with InputNode() as inp:
        dag = MultiOutputNode([actors[i].echo.bind(inp[i]) for i in range(n_actors)])
    payload_size = 10 * 1024 * 1024
    results += timeit(
        f"[unstable] multiple args with large payloads DAG calls, n={n_actors} actors",
        lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_actors)])),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        "[unstable] compiled multiple args with large payloads DAG calls, "
        f"n={n_actors} actors",
        lambda: _exec(compiled_dag, num_args=n_actors, payload_size=payload_size),
    )

    # Worst case for multiple arguments: a single actor takes all the arguments
    # with small payloads.

    actor = DAGActor.remote()
    n_args = 8
    with InputNode() as inp:
        dag = actor.echo_multiple.bind(*[inp[i] for i in range(n_args)])
    payload_size = 1
    results += timeit(
        "[unstable] single-actor with all args with small payloads DAG calls, "
        "n=1 actors",
        lambda: ray.get(dag.execute(*[b"x" * payload_size for _ in range(n_args)])),
    )
    compiled_dag = dag.experimental_compile()
    results += timeit(
        "[unstable] single-actor with all args with small payloads DAG calls, "
        "n=1 actors",
        lambda: _exec(compiled_dag, num_args=n_args, payload_size=payload_size),
    )

    ray.shutdown()

    return results


if __name__ == "__main__":
    main()