Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / serve / benchmarks / scalability.py
Size: Mime:
# A multi-node scalability test. We put an http proxy on the head node and spin
# up 20 nodes and put as many replicas as possible on the cluster, and run a
# stress test.
#
# Test will measure latency and throughput under a high load using `wrk`
# running on each node.
#
# Results for node 1 of 21:
# Running 10s test @ http://127.0.0.1:8000/hey
#   2 threads and 63 connections
#   Thread Stats   Avg      Stdev     Max   +/- Stdev
#     Latency   263.96ms   96.29ms 506.39ms   69.14%
#     Req/Sec   115.63     79.29   650.00     75.40%
#   2307 requests in 10.00s, 315.66KB read
# Requests/sec:    230.61
# Transfer/sec:     31.55KB
#
# Results for node 2 of 21:
# Running 10s test @ http://127.0.0.1:8000/hey
#   2 threads and 63 connections
#   Thread Stats   Avg      Stdev     Max   +/- Stdev
#     Latency   282.79ms   75.00ms 500.26ms   63.32%
#     Req/Sec   108.20     60.17   240.00     58.92%
#   2159 requests in 10.02s, 295.42KB read
# Requests/sec:    215.47
# Transfer/sec:     29.48KB
#
# [...] similar results for remaining nodes

import logging
import time
import subprocess
import requests

import ray
from ray.util.placement_group import placement_group, remove_placement_group

from ray import serve

logger = logging.getLogger(__file__)

ray.shutdown()
ray.init(address="auto")

# We ask for more worker but only need to run on smaller subset.
# This should account for worker nodes failed to launch.
expected_num_nodes = 6
num_replicas = 11
# wrk HTTP load testing config
num_connections = 20
num_threads = 2
time_to_run = "20s"

# Wait until the expected number of nodes have joined the cluster.
while True:
    num_nodes = len(list(filter(lambda node: node["Alive"], ray.nodes())))
    logger.info("Waiting for nodes {}/{}".format(num_nodes, expected_num_nodes))
    if num_nodes >= expected_num_nodes:
        break
    time.sleep(5)

logger.info("Nodes have all joined. There are %s resources.", ray.cluster_resources())

serve.start()

pg = placement_group(
    [{"CPU": 1} for _ in range(expected_num_nodes)], strategy="STRICT_SPREAD"
)
ray.get(pg.ready())


@serve.deployment(num_replicas=num_replicas)
def hey(*args):
    time.sleep(0.01)  # Sleep for 10ms
    return b"hey"


logger.info("Starting %i replicas", num_replicas)
hey.deploy()


@ray.remote(num_cpus=0)
def run_wrk():
    logger.info("Warming up")
    for _ in range(10):
        try:
            resp = requests.get("http://127.0.0.1:8000/hey").text
            logger.info("Received response '" + resp + "'")
            time.sleep(0.5)
        except Exception as e:
            logger.info(f"Got exception {e}")

    result = subprocess.run(
        [
            "wrk",
            "-c",
            str(num_connections),
            "-t",
            str(num_threads),
            "-d",
            time_to_run,
            "http://127.0.0.1:8000/hey",
        ],
        stdout=subprocess.PIPE,
    )
    return result.stdout.decode()


results = ray.get(
    [
        run_wrk.options(placement_group=pg, placement_group_bundle_index=i).remote()
        for i in range(expected_num_nodes)
    ]
)

for i in range(expected_num_nodes):
    logger.info("Results for node %i of %i:", i + 1, expected_num_nodes)
    logger.info(results[i])

remove_placement_group(pg)