Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
ray / purelib / ray / serve / tests / test_autoscaling_policy.py
Size: Mime:
import sys
import time

import pytest
from unittest import mock

from typing import List, Iterable

from ray._private.test_utils import SignalActor, wait_for_condition
from ray.serve._private.autoscaling_policy import (
    BasicAutoscalingPolicy,
    calculate_desired_num_replicas,
)
from ray.serve._private.common import DeploymentInfo
from ray.serve._private.deployment_state import ReplicaState
from ray.serve.config import AutoscalingConfig
from ray.serve._private.constants import CONTROL_LOOP_PERIOD_S
from ray.serve.controller import ServeController
from ray.serve.deployment import Deployment

import ray
from ray import serve
from ray.serve.generated.serve_pb2 import DeploymentRouteList


class TestCalculateDesiredNumReplicas:
    def test_bounds_checking(self):
        num_replicas = 10
        max_replicas = 11
        min_replicas = 9
        config = AutoscalingConfig(
            max_replicas=max_replicas,
            min_replicas=min_replicas,
            target_num_ongoing_requests_per_replica=100,
        )

        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=[150] * num_replicas
        )
        assert desired_num_replicas == max_replicas

        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=[50] * num_replicas
        )
        assert desired_num_replicas == min_replicas

        for i in range(50, 150):
            desired_num_replicas = calculate_desired_num_replicas(
                autoscaling_config=config,
                current_num_ongoing_requests=[i] * num_replicas,
            )
            assert min_replicas <= desired_num_replicas <= max_replicas

    def test_scale_up(self):
        config = AutoscalingConfig(
            min_replicas=0, max_replicas=100, target_num_ongoing_requests_per_replica=1
        )
        num_replicas = 10
        num_ongoing_requests = [2.0] * num_replicas
        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=num_ongoing_requests
        )
        assert 19 <= desired_num_replicas <= 21  # 10 * 2 = 20

    def test_scale_down(self):
        config = AutoscalingConfig(
            min_replicas=0, max_replicas=100, target_num_ongoing_requests_per_replica=1
        )
        num_replicas = 10
        num_ongoing_requests = [0.5] * num_replicas
        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=num_ongoing_requests
        )
        assert 4 <= desired_num_replicas <= 6  # 10 * 0.5 = 5

    def test_smoothing_factor(self):
        config = AutoscalingConfig(
            min_replicas=0,
            max_replicas=100,
            target_num_ongoing_requests_per_replica=1,
            smoothing_factor=0.5,
        )
        num_replicas = 10

        num_ongoing_requests = [4.0] * num_replicas
        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=num_ongoing_requests
        )
        assert 24 <= desired_num_replicas <= 26  # 10 + 0.5 * (40 - 10) = 25

        num_ongoing_requests = [0.25] * num_replicas
        desired_num_replicas = calculate_desired_num_replicas(
            autoscaling_config=config, current_num_ongoing_requests=num_ongoing_requests
        )
        assert 5 <= desired_num_replicas <= 8  # 10 + 0.5 * (2.5 - 10) = 6.25


def get_running_replicas(controller: ServeController, deployment: Deployment) -> List:
    """Get the replicas currently running for given deployment"""
    replicas = ray.get(
        controller._dump_replica_states_for_testing.remote(deployment.name)
    )
    running_replicas = replicas.get([ReplicaState.RUNNING])
    return running_replicas


def get_running_replica_tags(
    controller: ServeController, deployment: Deployment
) -> List:
    """Get the replica tags of running replicas for given deployment"""
    running_replicas = get_running_replicas(controller, deployment)
    return [replica.replica_tag for replica in running_replicas]


def get_num_running_replicas(
    controller: ServeController, deployment: Deployment
) -> int:
    """Get the amount of replicas currently running for given deployment"""
    running_replicas = get_running_replicas(controller, deployment)
    return len(running_replicas)


def assert_no_replicas_deprovisioned(
    replica_tags_1: Iterable[str], replica_tags_2: Iterable[str]
) -> None:
    """
    Checks whether any replica tags from replica_tags_1 are absent from
    replica_tags_2. Assumes that this indicates replicas were de-provisioned.

    replica_tags_1: Replica tags of running replicas at the first timestep
    replica_tags_2: Replica tags of running replicas at the second timestep
    """

    replica_tags_1, replica_tags_2 = set(replica_tags_1), set(replica_tags_2)
    num_matching_replicas = len(replica_tags_1.intersection(replica_tags_2))

    print(
        f"{num_matching_replicas} replica(s) stayed provisioned between "
        f"both deployments. All {len(replica_tags_1)} replica(s) were "
        f"expected to stay provisioned. "
        f"{len(replica_tags_1) - num_matching_replicas} replica(s) were "
        f"de-provisioned."
    )

    assert len(replica_tags_1) == num_matching_replicas


def test_assert_no_replicas_deprovisioned():
    replica_tags_1 = ["a", "b", "c"]
    replica_tags_2 = ["a", "b", "c", "d", "e"]

    assert_no_replicas_deprovisioned(replica_tags_1, replica_tags_2)
    with pytest.raises(AssertionError):
        assert_no_replicas_deprovisioned(replica_tags_2, replica_tags_1)


def get_deployment_start_time(controller: ServeController, deployment: Deployment):
    """Return start time for given deployment"""
    deployment_route_list = DeploymentRouteList.FromString(
        ray.get(controller.list_deployments.remote())
    )
    deployments = {
        deployment_route.deployment_info.name: (
            DeploymentInfo.from_proto(deployment_route.deployment_info),
            deployment_route.route if deployment_route.route != "" else None,
        )
        for deployment_route in deployment_route_list.deployment_routes
    }
    deployment_info, _route_prefix = deployments[deployment.name]
    return deployment_info.start_time_ms


@pytest.mark.parametrize("min_replicas", [1, 2])
def test_e2e_basic_scale_up_down(min_replicas, serve_instance):
    """Send 100 requests and check that we autoscale up, and then back down."""

    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": min_replicas,
            "max_replicas": 3,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0,
            "upscale_delay_s": 0,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    handle = serve.run(A.bind())

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    [handle.remote() for _ in range(100)]

    # scale up one more replica from min_replicas
    wait_for_condition(
        lambda: get_num_running_replicas(controller, A) >= min_replicas + 1
    )
    signal.send.remote()

    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= min_replicas)

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_basic_scale_up_down_with_0_replica(serve_instance):
    """Send 100 requests and check that we autoscale up, and then back down."""

    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 0,
            "max_replicas": 2,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0,
            "upscale_delay_s": 0,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    handle = serve.run(A.bind())

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    [handle.remote() for _ in range(100)]

    # scale up one more replica from min_replicas
    wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 1)
    signal.send.remote()

    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 0)

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time


@mock.patch.object(ServeController, "run_control_loop")
def test_initial_num_replicas(mock, serve_instance):
    """assert that the inital amount of replicas a deployment is launched with
    respects the bounds set by autoscaling_config.

    For this test we mock out the run event loop, make sure the number of
    replicas is set correctly before we hit the autoscaling procedure.
    """

    @serve.deployment(
        autoscaling_config={
            "min_replicas": 2,
            "max_replicas": 4,
        },
        version="v1",
    )
    class A:
        def __call__(self):
            return "ok!"

    serve.run(A.bind())

    controller = serve_instance._controller
    assert get_num_running_replicas(controller, A) == 2


def test_upscale_downscale_delay():
    """Unit test for upscale_delay_s and downscale_delay_s."""

    upscale_delay_s = 30.0
    downscale_delay_s = 600.0

    config = AutoscalingConfig(
        min_replicas=0,
        max_replicas=2,
        target_num_ongoing_requests_per_replica=1,
        upscale_delay_s=30.0,
        downscale_delay_s=600.0,
    )

    policy = BasicAutoscalingPolicy(config)

    upscale_wait_periods = int(upscale_delay_s / CONTROL_LOOP_PERIOD_S)
    downscale_wait_periods = int(downscale_delay_s / CONTROL_LOOP_PERIOD_S)

    overload_requests = [100]

    # Scale up when there are 0 replicas and current_handle_queued_queries > 0
    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=[],
        curr_target_num_replicas=0,
        current_handle_queued_queries=1,
    )
    assert new_num_replicas == 1

    # We should scale up only after enough consecutive scale-up decisions.
    for i in range(upscale_wait_periods):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=overload_requests,
            curr_target_num_replicas=1,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 1, i

    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=overload_requests,
        curr_target_num_replicas=1,
        current_handle_queued_queries=0,
    )
    assert new_num_replicas == 2

    no_requests = [0, 0]

    # We should scale down only after enough consecutive scale-down decisions.
    for i in range(downscale_wait_periods):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=no_requests,
            curr_target_num_replicas=2,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 2, i

    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=no_requests,
        curr_target_num_replicas=2,
        current_handle_queued_queries=0,
    )
    assert new_num_replicas == 0

    # Get some scale-up decisions, but not enough to trigger a scale up.
    for i in range(int(upscale_wait_periods / 2)):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=overload_requests,
            curr_target_num_replicas=1,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 1, i

    # Interrupt with a scale-down decision.
    policy.get_decision_num_replicas(
        current_num_ongoing_requests=[0],
        curr_target_num_replicas=1,
        current_handle_queued_queries=0,
    )

    # The counter should be reset, so it should require `upscale_wait_periods`
    # more periods before we actually scale up.
    for i in range(upscale_wait_periods):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=overload_requests,
            curr_target_num_replicas=1,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 1, i

    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=overload_requests,
        curr_target_num_replicas=1,
        current_handle_queued_queries=0,
    )
    assert new_num_replicas == 2

    # Get some scale-down decisions, but not enough to trigger a scale down.
    for i in range(int(downscale_wait_periods / 2)):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=no_requests,
            curr_target_num_replicas=2,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 2, i

    # Interrupt with a scale-up decision.
    policy.get_decision_num_replicas(
        current_num_ongoing_requests=[100, 100],
        curr_target_num_replicas=2,
        current_handle_queued_queries=0,
    )

    # The counter should be reset so it should require `downscale_wait_periods`
    # more periods before we actually scale down.
    for i in range(downscale_wait_periods):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=no_requests,
            curr_target_num_replicas=2,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 2, i

    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=no_requests,
        curr_target_num_replicas=2,
        current_handle_queued_queries=0,
    )
    assert new_num_replicas == 0


def test_replicas_delayed_startup():
    """Unit test simulating replicas taking time to start up."""
    config = AutoscalingConfig(
        min_replicas=1,
        max_replicas=200,
        target_num_ongoing_requests_per_replica=1,
        upscale_delay_s=0,
        downscale_delay_s=100000,
    )

    policy = BasicAutoscalingPolicy(config)

    new_num_replicas = policy.get_decision_num_replicas(1, [100], 0)
    assert new_num_replicas == 100

    # New target is 100, but no new replicas finished spinning up during this
    # timestep.
    new_num_replicas = policy.get_decision_num_replicas(100, [100], 0)
    assert new_num_replicas == 100

    # Two new replicas spun up during this timestep.
    new_num_replicas = policy.get_decision_num_replicas(100, [100, 20, 3], 0)
    assert new_num_replicas == 123

    # A lot of queries got drained and a lot of replicas started up, but
    # new_num_replicas should not decrease, because of the downscale delay.
    new_num_replicas = policy.get_decision_num_replicas(123, [6, 2, 1, 1], 0)
    assert new_num_replicas == 123


@pytest.mark.parametrize("delay_s", [30.0, 0.0])
def test_fluctuating_ongoing_requests(delay_s):
    """
    Simulates a workload that switches between too many and too few
    ongoing requests.
    """

    config = AutoscalingConfig(
        min_replicas=1,
        max_replicas=10,
        target_num_ongoing_requests_per_replica=50,
        upscale_delay_s=delay_s,
        downscale_delay_s=delay_s,
    )

    policy = BasicAutoscalingPolicy(config)

    if delay_s > 0:
        wait_periods = int(delay_s / CONTROL_LOOP_PERIOD_S)
        assert wait_periods > 1

    underload_requests, overload_requests = [20, 20], [100]
    trials = 1000

    new_num_replicas = None
    for trial in range(trials):
        if trial % 2 == 0:
            new_num_replicas = policy.get_decision_num_replicas(
                current_num_ongoing_requests=overload_requests,
                curr_target_num_replicas=1,
                current_handle_queued_queries=0,
            )
            if delay_s > 0:
                assert new_num_replicas == 1, trial
            else:
                assert new_num_replicas == 2, trial
        else:
            new_num_replicas = policy.get_decision_num_replicas(
                current_num_ongoing_requests=underload_requests,
                curr_target_num_replicas=2,
                current_handle_queued_queries=0,
            )
            if delay_s > 0:
                assert new_num_replicas == 2, trial
            else:
                assert new_num_replicas == 1, trial


@pytest.mark.parametrize(
    "ongoing_requests", [[7, 1, 8, 4], [8, 1, 8, 4], [6, 1, 8, 4], [0, 1, 8, 4]]
)
def test_imbalanced_replicas(ongoing_requests):
    config = AutoscalingConfig(
        min_replicas=1,
        max_replicas=10,
        target_num_ongoing_requests_per_replica=5,
        upscale_delay_s=0.0,
        downscale_delay_s=0.0,
    )

    policy = BasicAutoscalingPolicy(config)

    # Check that as long as the average number of ongoing requests equals
    # the target_num_ongoing_requests_per_replica, the number of replicas
    # stays the same
    if (
        sum(ongoing_requests) / len(ongoing_requests)
        == config.target_num_ongoing_requests_per_replica
    ):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=ongoing_requests,
            curr_target_num_replicas=4,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 4

    # Check downscaling behavior when average number of requests
    # is lower than target_num_ongoing_requests_per_replica
    elif (
        sum(ongoing_requests) / len(ongoing_requests)
        < config.target_num_ongoing_requests_per_replica
    ):
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=ongoing_requests,
            curr_target_num_replicas=4,
            current_handle_queued_queries=0,
        )

        if (
            config.target_num_ongoing_requests_per_replica
            - sum(ongoing_requests) / len(ongoing_requests)
            <= 1
        ):
            # Autoscaling uses a ceiling operator, which means a slightly low
            # current_num_ongoing_requests value is insufficient to downscale
            assert new_num_replicas == 4
        else:
            assert new_num_replicas == 3

    # Check upscaling behavior when average number of requests
    # is higher than target_num_ongoing_requests_per_replica
    else:
        new_num_replicas = policy.get_decision_num_replicas(
            current_num_ongoing_requests=ongoing_requests,
            curr_target_num_replicas=4,
            current_handle_queued_queries=0,
        )
        assert new_num_replicas == 5


@pytest.mark.parametrize(
    "ongoing_requests", [[20, 0, 0, 0], [100, 0, 0, 0], [10, 0, 0, 0]]
)
def test_single_replica_receives_all_requests(ongoing_requests):
    target_requests = 5

    config = AutoscalingConfig(
        min_replicas=1,
        max_replicas=50,
        target_num_ongoing_requests_per_replica=target_requests,
        upscale_delay_s=0.0,
        downscale_delay_s=0.0,
    )

    policy = BasicAutoscalingPolicy(config)

    new_num_replicas = policy.get_decision_num_replicas(
        current_num_ongoing_requests=ongoing_requests,
        curr_target_num_replicas=4,
        current_handle_queued_queries=0,
    )
    assert new_num_replicas == sum(ongoing_requests) / target_requests


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_bursty(serve_instance):
    """
    Sends 100 requests in bursts. Uses delays for smooth provisioning.
    """

    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 1,
            "max_replicas": 2,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0.2,
            "upscale_delay_s": 0.2,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    handle = serve.run(A.bind())

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    [handle.remote() for _ in range(100)]

    wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
    num_replicas = get_num_running_replicas(controller, A)
    signal.send.remote()

    # Execute a bursty workload that issues 100 requests every 0.05 seconds
    # The SignalActor allows all requests in a burst to be queued before they
    # are all executed, which increases the
    # target_in_flight_requests_per_replica. Then the send method will bring
    # it back to 0. This bursty behavior should be smoothed by the delay
    # parameters.
    for _ in range(5):
        time.sleep(0.05)
        assert get_num_running_replicas(controller, A) == num_replicas
        [handle.remote() for _ in range(100)]
        signal.send.remote()

    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1)

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_intermediate_downscaling(serve_instance):
    """
    Scales up, then down, and up again.
    """

    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 0,
            "max_replicas": 20,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0.2,
            "upscale_delay_s": 0.2,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    handle = serve.run(A.bind())

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    A.get_handle()
    [handle.remote() for _ in range(50)]

    wait_for_condition(
        lambda: get_num_running_replicas(controller, A) >= 20, timeout=30
    )
    signal.send.remote()

    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 1, timeout=30)
    signal.send.remote(clear=True)

    [handle.remote() for _ in range(50)]
    wait_for_condition(
        lambda: get_num_running_replicas(controller, A) >= 20, timeout=30
    )

    signal.send.remote()
    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1, timeout=30)

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
@pytest.mark.skip(reason="Currently failing with undefined behavior")
def test_e2e_update_autoscaling_deployment(serve_instance):
    # See https://github.com/ray-project/ray/issues/21017 for details

    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 0,
            "max_replicas": 10,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0.2,
            "upscale_delay_s": 0.2,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    handle = serve.run(A.bind())
    print("Deployed A with min_replicas 1 and max_replicas 10.")

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    assert get_num_running_replicas(controller, A) == 0

    [handle.remote() for _ in range(400)]
    print("Issued 400 requests.")

    wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 10)
    print("Scaled to 10 replicas.")
    first_deployment_replicas = get_running_replica_tags(controller, A)

    assert get_num_running_replicas(controller, A) < 20

    [handle.remote() for _ in range(458)]
    time.sleep(3)
    print("Issued 458 requests. Request routing in-progress.")

    serve.run(
        A.options(
            autoscaling_config={
                "metrics_interval_s": 0.1,
                "min_replicas": 2,
                "max_replicas": 20,
                "look_back_period_s": 0.2,
                "downscale_delay_s": 0.2,
                "upscale_delay_s": 0.2,
            },
            version="v1",
        ).bind()
    )
    print("Redeployed A.")

    wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 20)
    print("Scaled up to 20 requests.")
    second_deployment_replicas = get_running_replica_tags(controller, A)

    # Confirm that none of the original replicas were de-provisioned
    assert_no_replicas_deprovisioned(
        first_deployment_replicas, second_deployment_replicas
    )

    signal.send.remote()

    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2)
    assert get_num_running_replicas(controller, A) > 1

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time

    # scale down to 0
    serve.run(
        A.options(
            autoscaling_config={
                "metrics_interval_s": 0.1,
                "min_replicas": 0,
                "max_replicas": 20,
                "look_back_period_s": 0.2,
                "downscale_delay_s": 0.2,
                "upscale_delay_s": 0.2,
            },
            version="v1",
        ).bind()
    )
    print("Redeployed A.")

    wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1)
    assert get_num_running_replicas(controller, A) == 0

    # scale up
    [handle.remote() for _ in range(400)]
    wait_for_condition(lambda: get_num_running_replicas(controller, A) > 0)
    signal.send.remote()
    wait_for_condition(lambda: get_num_running_replicas(controller, A) < 1)


@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
def test_e2e_raise_min_replicas(serve_instance):
    signal = SignalActor.remote()

    @serve.deployment(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 0,
            "max_replicas": 10,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0.2,
            "upscale_delay_s": 0.2,
        },
        # We will send over a lot of queries. This will make sure replicas are
        # killed quickly during cleanup.
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    )
    class A:
        def __call__(self):
            ray.get(signal.wait.remote())

    A.deploy()
    print("Deployed A.")

    controller = serve_instance._controller
    start_time = get_deployment_start_time(controller, A)

    assert get_num_running_replicas(controller, A) == 0

    handle = A.get_handle()
    [handle.remote() for _ in range(1)]
    print("Issued one request.")

    time.sleep(2)
    assert get_num_running_replicas(controller, A) == 1
    print("Scale up to 1 replica.")

    first_deployment_replicas = get_running_replica_tags(controller, A)

    A.options(
        autoscaling_config={
            "metrics_interval_s": 0.1,
            "min_replicas": 2,
            "max_replicas": 10,
            "look_back_period_s": 0.2,
            "downscale_delay_s": 0.2,
            "upscale_delay_s": 0.2,
        },
        graceful_shutdown_timeout_s=1,
        max_concurrent_queries=1000,
        version="v1",
    ).deploy()
    print("Redeployed A with min_replicas set to 2.")

    wait_for_condition(lambda: get_num_running_replicas(controller, A) >= 2)
    time.sleep(5)

    # Confirm that autoscaler doesn't scale above 2 even after waiting
    assert get_num_running_replicas(controller, A) == 2
    print("Autoscaled to 2 without issuing any new requests.")

    second_deployment_replicas = get_running_replica_tags(controller, A)

    # Confirm that none of the original replicas were de-provisioned
    assert_no_replicas_deprovisioned(
        first_deployment_replicas, second_deployment_replicas
    )

    signal.send.remote()
    time.sleep(1)
    print("Completed request.")

    # As the queue is drained, we should scale back down.
    wait_for_condition(lambda: get_num_running_replicas(controller, A) <= 2)
    assert get_num_running_replicas(controller, A) > 1
    print("Stayed at 2 replicas.")

    # Make sure start time did not change for the deployment
    assert get_deployment_start_time(controller, A) == start_time


if __name__ == "__main__":
    import sys
    import pytest

    sys.exit(pytest.main(["-v", "-s", __file__]))