Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ tests / test_flight_async.py

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

import asyncio

import pytest

import pyarrow

flight = pytest.importorskip("pyarrow.flight")
pytestmark = pytest.mark.flight


class ExampleServer(flight.FlightServerBase):
    simple_info = flight.FlightInfo(
        pyarrow.schema([("a", "int32")]),
        flight.FlightDescriptor.for_command(b"simple"),
        [],
        -1,
        -1,
    )

    def get_flight_info(self, context, descriptor):
        if descriptor.command == b"simple":
            return self.simple_info
        elif descriptor.command == b"unknown":
            raise NotImplementedError("Unknown command")

        raise NotImplementedError("Unknown descriptor")


def async_or_skip(client):
    if not client.supports_async:
        # Use async error message as skip message
        with pytest.raises(NotImplementedError) as e:
            client.as_async()
        pytest.skip(str(e.value))


@pytest.fixture(scope="module")
def flight_client():
    with ExampleServer() as server:
        with flight.connect(f"grpc://localhost:{server.port}") as client:
            yield client


@pytest.fixture(scope="module")
def async_client(flight_client):
    async_or_skip(flight_client)
    yield flight_client.as_async()


def test_async_support_property(flight_client):
    assert isinstance(flight_client.supports_async, bool)
    if flight_client.supports_async:
        flight_client.as_async()
    else:
        with pytest.raises(NotImplementedError):
            flight_client.as_async()


def test_get_flight_info(async_client):
    async def _test():
        descriptor = flight.FlightDescriptor.for_command(b"simple")
        info = await async_client.get_flight_info(descriptor)
        assert info == ExampleServer.simple_info

    asyncio.run(_test())


def test_get_flight_info_error(async_client):
    async def _test():
        descriptor = flight.FlightDescriptor.for_command(b"unknown")
        with pytest.raises(NotImplementedError) as excinfo:
            await async_client.get_flight_info(descriptor)

        assert "Unknown command" in repr(excinfo.value)

    asyncio.run(_test())