# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import ast
import base64
import itertools
import os
import pathlib
import signal
import struct
import tempfile
import threading
import time
import traceback
import json
from datetime import datetime
try:
import numpy as np
except ImportError:
np = None
import pytest
import pyarrow as pa
from pyarrow.lib import IpcReadOptions, tobytes
from pyarrow.util import find_free_port
from pyarrow.tests import util
try:
from pyarrow import flight
from pyarrow.flight import (
FlightClient, FlightServerBase,
ServerAuthHandler, ClientAuthHandler,
ServerMiddleware, ServerMiddlewareFactory,
ClientMiddleware, ClientMiddlewareFactory,
)
except ImportError:
flight = None
FlightClient, FlightServerBase = object, object
ServerAuthHandler, ClientAuthHandler = object, object
ServerMiddleware, ServerMiddlewareFactory = object, object
ClientMiddleware, ClientMiddlewareFactory = object, object
# Marks all of the tests in this module
# Ignore these with pytest ... -m 'not flight'
pytestmark = pytest.mark.flight
def test_import():
# So we see the ImportError somewhere
import pyarrow.flight # noqa
def resource_root():
"""Get the path to the test resources directory."""
if not os.environ.get("ARROW_TEST_DATA"):
raise RuntimeError("Test resources not found; set "
"ARROW_TEST_DATA to <repo root>/testing/data")
return pathlib.Path(os.environ["ARROW_TEST_DATA"]) / "flight"
def read_flight_resource(path):
"""Get the contents of a test resource file."""
root = resource_root()
if not root:
return None
try:
with (root / path).open("rb") as f:
return f.read()
except FileNotFoundError:
raise RuntimeError(
"Test resource {} not found; did you initialize the "
"test resource submodule?\n{}".format(root / path,
traceback.format_exc()))
def example_tls_certs():
"""Get the paths to test TLS certificates."""
return {
"root_cert": read_flight_resource("root-ca.pem"),
"certificates": [
flight.CertKeyPair(
cert=read_flight_resource("cert0.pem"),
key=read_flight_resource("cert0.key"),
),
flight.CertKeyPair(
cert=read_flight_resource("cert1.pem"),
key=read_flight_resource("cert1.key"),
),
]
}
def simple_ints_table():
data = [
pa.array([-10, -5, 0, 5, 10])
]
return pa.Table.from_arrays(data, names=['some_ints'])
def simple_dicts_table():
dict_values = pa.array(["foo", "baz", "quux"], type=pa.utf8())
data = [
pa.chunked_array([
pa.DictionaryArray.from_arrays([1, 0, None], dict_values),
pa.DictionaryArray.from_arrays([2, 1], dict_values)
])
]
return pa.Table.from_arrays(data, names=['some_dicts'])
def multiple_column_table():
return pa.Table.from_arrays([pa.array(['foo', 'bar', 'baz', 'qux']),
pa.array([1, 2, 3, 4])],
names=['a', 'b'])
class ConstantFlightServer(FlightServerBase):
"""A Flight server that always returns the same data.
See ARROW-4796: this server implementation will segfault if Flight
does not properly hold a reference to the Table object.
"""
CRITERIA = b"the expected criteria"
def __init__(self, location=None, options=None, **kwargs):
super().__init__(location, **kwargs)
# Ticket -> Table
self.table_factories = {
b'ints': simple_ints_table,
b'dicts': simple_dicts_table,
b'multi': multiple_column_table,
}
self.options = options
def list_flights(self, context, criteria):
if criteria == self.CRITERIA:
yield flight.FlightInfo(
pa.schema([]),
flight.FlightDescriptor.for_path('/foo'),
[]
)
def do_get(self, context, ticket):
# Return a fresh table, so that Flight is the only one keeping a
# reference.
table = self.table_factories[ticket.ticket]()
return flight.RecordBatchStream(table, options=self.options)
class MetadataFlightServer(FlightServerBase):
"""A Flight server that numbers incoming/outgoing data."""
def __init__(self, options=None, **kwargs):
super().__init__(**kwargs)
self.options = options
def do_get(self, context, ticket):
data = [
pa.array([-10, -5, 0, 5, 10])
]
table = pa.Table.from_arrays(data, names=['a'])
return flight.GeneratorStream(
table.schema,
self.number_batches(table),
options=self.options)
def do_put(self, context, descriptor, reader, writer):
counter = 0
expected_data = [-10, -5, 0, 5, 10]
for batch, buf in reader:
assert batch.equals(pa.RecordBatch.from_arrays(
[pa.array([expected_data[counter]])],
['a']
))
assert buf is not None
client_counter, = struct.unpack('<i', buf.to_pybytes())
assert counter == client_counter
writer.write(struct.pack('<i', counter))
counter += 1
@staticmethod
def number_batches(table):
for idx, batch in enumerate(table.to_batches()):
buf = struct.pack('<i', idx)
yield batch, buf
class EchoFlightServer(FlightServerBase):
"""A Flight server that returns the last data uploaded."""
def __init__(self, location=None, expected_schema=None, **kwargs):
super().__init__(location, **kwargs)
self.last_message = None
self.expected_schema = expected_schema
def do_get(self, context, ticket):
return flight.RecordBatchStream(self.last_message)
def do_put(self, context, descriptor, reader, writer):
if self.expected_schema:
assert self.expected_schema == reader.schema
self.last_message = reader.read_all()
def do_exchange(self, context, descriptor, reader, writer):
for chunk in reader:
pass
class EchoStreamFlightServer(EchoFlightServer):
"""An echo server that streams individual record batches."""
def do_get(self, context, ticket):
return flight.GeneratorStream(
self.last_message.schema,
self.last_message.to_batches(max_chunksize=1024))
def list_actions(self, context):
return []
def do_action(self, context, action):
if action.type == "who-am-i":
return [context.peer_identity(), context.peer().encode("utf-8")]
raise NotImplementedError
class GetInfoFlightServer(FlightServerBase):
"""A Flight server that tests GetFlightInfo."""
def get_flight_info(self, context, descriptor):
return flight.FlightInfo(
pa.schema([('a', pa.int32())]),
descriptor,
[
flight.FlightEndpoint(b'', ['grpc://test']),
flight.FlightEndpoint(
b'',
[flight.Location.for_grpc_tcp('localhost', 5005)],
pa.scalar("2023-04-05T12:34:56.789012345").cast(pa.timestamp("ns")),
"endpoint app metadata"
),
],
1,
42,
True,
"info app metadata"
)
def get_schema(self, context, descriptor):
info = self.get_flight_info(context, descriptor)
return flight.SchemaResult(info.schema)
class ListActionsFlightServer(FlightServerBase):
"""A Flight server that tests ListActions."""
@classmethod
def expected_actions(cls):
return [
("action-1", "description"),
("action-2", ""),
flight.ActionType("action-3", "more detail"),
]
def list_actions(self, context):
yield from self.expected_actions()
class ListActionsErrorFlightServer(FlightServerBase):
"""A Flight server that tests ListActions."""
def list_actions(self, context):
yield ("action-1", "")
yield "foo"
class CheckTicketFlightServer(FlightServerBase):
"""A Flight server that compares the given ticket to an expected value."""
def __init__(self, expected_ticket, location=None, **kwargs):
super().__init__(location, **kwargs)
self.expected_ticket = expected_ticket
def do_get(self, context, ticket):
assert self.expected_ticket == ticket.ticket
data1 = [pa.array([-10, -5, 0, 5, 10], type=pa.int32())]
table = pa.Table.from_arrays(data1, names=['a'])
return flight.RecordBatchStream(table)
def do_put(self, context, descriptor, reader):
self.last_message = reader.read_all()
class InvalidStreamFlightServer(FlightServerBase):
"""A Flight server that tries to return messages with differing schemas."""
schema = pa.schema([('a', pa.int32())])
def do_get(self, context, ticket):
data1 = [pa.array([-10, -5, 0, 5, 10], type=pa.int32())]
data2 = [pa.array([-10.0, -5.0, 0.0, 5.0, 10.0], type=pa.float64())]
assert data1.type != data2.type
table1 = pa.Table.from_arrays(data1, names=['a'])
table2 = pa.Table.from_arrays(data2, names=['a'])
assert table1.schema == self.schema
return flight.GeneratorStream(self.schema, [table1, table2])
class NeverSendsDataFlightServer(FlightServerBase):
"""A Flight server that never actually yields data."""
schema = pa.schema([('a', pa.int32())])
def do_get(self, context, ticket):
if ticket.ticket == b'yield_data':
# Check that the server handler will ignore empty tables
# up to a certain extent
data = [
self.schema.empty_table(),
self.schema.empty_table(),
pa.RecordBatch.from_arrays([range(5)], schema=self.schema),
]
return flight.GeneratorStream(self.schema, data)
return flight.GeneratorStream(
self.schema, itertools.repeat(self.schema.empty_table()))
class SlowFlightServer(FlightServerBase):
"""A Flight server that delays its responses to test timeouts."""
Loading ...