# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
import collections
import enum
import re
import time
import warnings
import weakref
from cython.operator cimport dereference as deref
from cython.operator cimport postincrement
from libcpp cimport bool as c_bool
from pyarrow.lib cimport *
from pyarrow.lib import (ArrowCancelled, ArrowException, ArrowInvalid,
SignalStopHandler)
from pyarrow.lib import as_buffer, frombytes, timestamp, tobytes
from pyarrow.includes.libarrow_flight cimport *
from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
import pyarrow.lib as lib
cdef CFlightCallOptions DEFAULT_CALL_OPTIONS
cdef int check_flight_status(const CStatus& status) except -1 nogil:
cdef shared_ptr[FlightStatusDetail] detail
if status.ok():
return 0
detail = FlightStatusDetail.UnwrapStatus(status)
if detail:
with gil:
message = frombytes(status.message(), safe=True)
detail_msg = detail.get().extra_info()
if detail.get().code() == CFlightStatusInternal:
raise FlightInternalError(message, detail_msg)
elif detail.get().code() == CFlightStatusFailed:
message = _munge_grpc_python_error(message)
raise FlightServerError(message, detail_msg)
elif detail.get().code() == CFlightStatusTimedOut:
raise FlightTimedOutError(message, detail_msg)
elif detail.get().code() == CFlightStatusCancelled:
raise FlightCancelledError(message, detail_msg)
elif detail.get().code() == CFlightStatusUnauthenticated:
raise FlightUnauthenticatedError(message, detail_msg)
elif detail.get().code() == CFlightStatusUnauthorized:
raise FlightUnauthorizedError(message, detail_msg)
elif detail.get().code() == CFlightStatusUnavailable:
raise FlightUnavailableError(message, detail_msg)
size_detail = FlightWriteSizeStatusDetail.UnwrapStatus(status)
if size_detail:
with gil:
message = frombytes(status.message(), safe=True)
raise FlightWriteSizeExceededError(
message,
size_detail.get().limit(), size_detail.get().actual())
return check_status(status)
_FLIGHT_SERVER_ERROR_REGEX = re.compile(
r'Flight RPC failed with message: (.*). Detail: '
r'Python exception: (.*)',
re.DOTALL
)
def _munge_grpc_python_error(message):
m = _FLIGHT_SERVER_ERROR_REGEX.match(message)
if m:
return ('Flight RPC failed with Python exception \"{}: {}\"'
.format(m.group(2), m.group(1)))
else:
return message
cdef IpcWriteOptions _get_options(options):
return <IpcWriteOptions> _get_legacy_format_default(
use_legacy_format=None, options=options)
cdef class FlightCallOptions(_Weakrefable):
"""RPC-layer options for a Flight call."""
cdef:
CFlightCallOptions options
def __init__(self, timeout=None, write_options=None, headers=None,
IpcReadOptions read_options=None):
"""Create call options.
Parameters
----------
timeout : float, None
A timeout for the call, in seconds. None means that the
timeout defaults to an implementation-specific value.
write_options : pyarrow.ipc.IpcWriteOptions, optional
IPC write options. The default options can be controlled
by environment variables (see pyarrow.ipc).
headers : List[Tuple[str, str]], optional
A list of arbitrary headers as key, value tuples
read_options : pyarrow.ipc.IpcReadOptions, optional
Serialization options for reading IPC format.
"""
cdef IpcWriteOptions c_write_options
if timeout is not None:
self.options.timeout = CTimeoutDuration(timeout)
if write_options is not None:
c_write_options = _get_options(write_options)
self.options.write_options = c_write_options.c_options
if read_options is not None:
if not isinstance(read_options, IpcReadOptions):
raise TypeError("expected IpcReadOptions, got {}"
.format(type(read_options)))
self.options.read_options = read_options.c_options
if headers is not None:
self.options.headers = headers
@staticmethod
cdef CFlightCallOptions* unwrap(obj):
if not obj:
return &DEFAULT_CALL_OPTIONS
elif isinstance(obj, FlightCallOptions):
return &((<FlightCallOptions> obj).options)
raise TypeError("Expected a FlightCallOptions object, not "
"'{}'".format(type(obj)))
_CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key'])
class CertKeyPair(_CertKeyPair):
"""A TLS certificate and key for use in Flight."""
cdef class FlightError(Exception):
"""
The base class for Flight-specific errors.
A server may raise this class or one of its subclasses to provide
a more detailed error to clients.
Parameters
----------
message : str, optional
The error message.
extra_info : bytes, optional
Extra binary error details that were provided by the
server/will be sent to the client.
Attributes
----------
extra_info : bytes
Extra binary error details that were provided by the
server/will be sent to the client.
"""
cdef dict __dict__
def __init__(self, message='', extra_info=b''):
super().__init__(message)
self.extra_info = tobytes(extra_info)
cdef CStatus to_status(self):
message = tobytes("Flight error: {}".format(str(self)))
return CStatus_UnknownError(message)
cdef class FlightInternalError(FlightError, ArrowException):
"""An error internal to the Flight server occurred."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusInternal,
tobytes(str(self)), self.extra_info)
cdef class FlightTimedOutError(FlightError, ArrowException):
"""The Flight RPC call timed out."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusTimedOut,
tobytes(str(self)), self.extra_info)
cdef class FlightCancelledError(FlightError, ArrowCancelled):
"""The operation was cancelled."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusCancelled, tobytes(str(self)),
self.extra_info)
cdef class FlightServerError(FlightError, ArrowException):
"""A server error occurred."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusFailed, tobytes(str(self)),
self.extra_info)
cdef class FlightUnauthenticatedError(FlightError, ArrowException):
"""The client is not authenticated."""
cdef CStatus to_status(self):
return MakeFlightError(
CFlightStatusUnauthenticated, tobytes(str(self)), self.extra_info)
cdef class FlightUnauthorizedError(FlightError, ArrowException):
"""The client is not authorized to perform the given operation."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusUnauthorized, tobytes(str(self)),
self.extra_info)
cdef class FlightUnavailableError(FlightError, ArrowException):
"""The server is not reachable or available."""
cdef CStatus to_status(self):
return MakeFlightError(CFlightStatusUnavailable, tobytes(str(self)),
self.extra_info)
class FlightWriteSizeExceededError(ArrowInvalid):
"""A write operation exceeded the client-configured limit."""
def __init__(self, message, limit, actual):
super().__init__(message)
self.limit = limit
self.actual = actual
cdef class Action(_Weakrefable):
"""An action executable on a Flight service."""
cdef:
CAction action
def __init__(self, action_type, buf):
"""Create an action from a type and a buffer.
Parameters
----------
action_type : bytes or str
buf : Buffer or bytes-like object
"""
self.action.type = tobytes(action_type)
self.action.body = pyarrow_unwrap_buffer(as_buffer(buf))
@property
def type(self):
"""The action type."""
return frombytes(self.action.type)
@property
def body(self):
"""The action body (arguments for the action)."""
return pyarrow_wrap_buffer(self.action.body)
@staticmethod
cdef CAction unwrap(action) except *:
if not isinstance(action, Action):
raise TypeError("Must provide Action, not '{}'".format(
type(action)))
return (<Action> action).action
def serialize(self):
"""Get the wire-format representation of this type.
Useful when interoperating with non-Flight systems (e.g. REST
services) that may want to return Flight types.
"""
return GetResultValue(self.action.SerializeToString())
@classmethod
def deserialize(cls, serialized):
"""Parse the wire-format representation of this type.
Useful when interoperating with non-Flight systems (e.g. REST
services) that may want to return Flight types.
"""
cdef Action action = Action.__new__(Action)
action.action = GetResultValue(
CAction.Deserialize(tobytes(serialized)))
return action
def __eq__(self, Action other):
return self.action == other.action
def __repr__(self):
return (f"<pyarrow.flight.Action type={self.type!r} "
f"body=({self.body.size} bytes)>")
_ActionType = collections.namedtuple('_ActionType', ['type', 'description'])
class ActionType(_ActionType):
"""A type of action that is executable on a Flight service."""
def make_action(self, buf):
"""Create an Action with this type.
Parameters
----------
buf : obj
An Arrow buffer or Python bytes or bytes-like object.
"""
return Action(self.type, buf)
cdef class Result(_Weakrefable):
"""A result from executing an Action."""
cdef:
unique_ptr[CFlightResult] result
def __init__(self, buf):
"""Create a new result.
Parameters
----------
buf : Buffer or bytes-like object
Loading ...