Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev65 

/ _flight.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

import collections
import enum
import re
import time
import warnings
import weakref

from cython.operator cimport dereference as deref
from cython.operator cimport postincrement
from libcpp cimport bool as c_bool

from pyarrow.lib cimport *
from pyarrow.lib import (ArrowCancelled, ArrowException, ArrowInvalid,
                         SignalStopHandler)
from pyarrow.lib import as_buffer, frombytes, timestamp, tobytes
from pyarrow.includes.libarrow_flight cimport *
from pyarrow.ipc import _get_legacy_format_default, _ReadPandasMixin
import pyarrow.lib as lib


cdef CFlightCallOptions DEFAULT_CALL_OPTIONS


cdef int check_flight_status(const CStatus& status) except -1 nogil:
    cdef shared_ptr[FlightStatusDetail] detail

    if status.ok():
        return 0

    detail = FlightStatusDetail.UnwrapStatus(status)
    if detail:
        with gil:
            message = frombytes(status.message(), safe=True)
            detail_msg = detail.get().extra_info()
            if detail.get().code() == CFlightStatusInternal:
                raise FlightInternalError(message, detail_msg)
            elif detail.get().code() == CFlightStatusFailed:
                message = _munge_grpc_python_error(message)
                raise FlightServerError(message, detail_msg)
            elif detail.get().code() == CFlightStatusTimedOut:
                raise FlightTimedOutError(message, detail_msg)
            elif detail.get().code() == CFlightStatusCancelled:
                raise FlightCancelledError(message, detail_msg)
            elif detail.get().code() == CFlightStatusUnauthenticated:
                raise FlightUnauthenticatedError(message, detail_msg)
            elif detail.get().code() == CFlightStatusUnauthorized:
                raise FlightUnauthorizedError(message, detail_msg)
            elif detail.get().code() == CFlightStatusUnavailable:
                raise FlightUnavailableError(message, detail_msg)

    size_detail = FlightWriteSizeStatusDetail.UnwrapStatus(status)
    if size_detail:
        with gil:
            message = frombytes(status.message(), safe=True)
            raise FlightWriteSizeExceededError(
                message,
                size_detail.get().limit(), size_detail.get().actual())

    return check_status(status)


_FLIGHT_SERVER_ERROR_REGEX = re.compile(
    r'Flight RPC failed with message: (.*). Detail: '
    r'Python exception: (.*)',
    re.DOTALL
)


def _munge_grpc_python_error(message):
    m = _FLIGHT_SERVER_ERROR_REGEX.match(message)
    if m:
        return ('Flight RPC failed with Python exception \"{}: {}\"'
                .format(m.group(2), m.group(1)))
    else:
        return message


cdef IpcWriteOptions _get_options(options):
    return <IpcWriteOptions> _get_legacy_format_default(
        use_legacy_format=None, options=options)


cdef class FlightCallOptions(_Weakrefable):
    """RPC-layer options for a Flight call."""

    cdef:
        CFlightCallOptions options

    def __init__(self, timeout=None, write_options=None, headers=None,
                 IpcReadOptions read_options=None):
        """Create call options.

        Parameters
        ----------
        timeout : float, None
            A timeout for the call, in seconds. None means that the
            timeout defaults to an implementation-specific value.
        write_options : pyarrow.ipc.IpcWriteOptions, optional
            IPC write options. The default options can be controlled
            by environment variables (see pyarrow.ipc).
        headers : List[Tuple[str, str]], optional
            A list of arbitrary headers as key, value tuples
        read_options : pyarrow.ipc.IpcReadOptions, optional
            Serialization options for reading IPC format.
        """
        cdef IpcWriteOptions c_write_options

        if timeout is not None:
            self.options.timeout = CTimeoutDuration(timeout)
        if write_options is not None:
            c_write_options = _get_options(write_options)
            self.options.write_options = c_write_options.c_options
        if read_options is not None:
            if not isinstance(read_options, IpcReadOptions):
                raise TypeError("expected IpcReadOptions, got {}"
                                .format(type(read_options)))
            self.options.read_options = read_options.c_options
        if headers is not None:
            self.options.headers = headers

    @staticmethod
    cdef CFlightCallOptions* unwrap(obj):
        if not obj:
            return &DEFAULT_CALL_OPTIONS
        elif isinstance(obj, FlightCallOptions):
            return &((<FlightCallOptions> obj).options)
        raise TypeError("Expected a FlightCallOptions object, not "
                        "'{}'".format(type(obj)))


_CertKeyPair = collections.namedtuple('_CertKeyPair', ['cert', 'key'])


class CertKeyPair(_CertKeyPair):
    """A TLS certificate and key for use in Flight."""


cdef class FlightError(Exception):
    """
    The base class for Flight-specific errors.

    A server may raise this class or one of its subclasses to provide
    a more detailed error to clients.

    Parameters
    ----------
    message : str, optional
        The error message.
    extra_info : bytes, optional
        Extra binary error details that were provided by the
        server/will be sent to the client.

    Attributes
    ----------
    extra_info : bytes
        Extra binary error details that were provided by the
        server/will be sent to the client.
  """

    cdef dict __dict__

    def __init__(self, message='', extra_info=b''):
        super().__init__(message)
        self.extra_info = tobytes(extra_info)

    cdef CStatus to_status(self):
        message = tobytes("Flight error: {}".format(str(self)))
        return CStatus_UnknownError(message)


cdef class FlightInternalError(FlightError, ArrowException):
    """An error internal to the Flight server occurred."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusInternal,
                               tobytes(str(self)), self.extra_info)


cdef class FlightTimedOutError(FlightError, ArrowException):
    """The Flight RPC call timed out."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusTimedOut,
                               tobytes(str(self)), self.extra_info)


cdef class FlightCancelledError(FlightError, ArrowCancelled):
    """The operation was cancelled."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusCancelled, tobytes(str(self)),
                               self.extra_info)


cdef class FlightServerError(FlightError, ArrowException):
    """A server error occurred."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusFailed, tobytes(str(self)),
                               self.extra_info)


cdef class FlightUnauthenticatedError(FlightError, ArrowException):
    """The client is not authenticated."""

    cdef CStatus to_status(self):
        return MakeFlightError(
            CFlightStatusUnauthenticated, tobytes(str(self)), self.extra_info)


cdef class FlightUnauthorizedError(FlightError, ArrowException):
    """The client is not authorized to perform the given operation."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusUnauthorized, tobytes(str(self)),
                               self.extra_info)


cdef class FlightUnavailableError(FlightError, ArrowException):
    """The server is not reachable or available."""

    cdef CStatus to_status(self):
        return MakeFlightError(CFlightStatusUnavailable, tobytes(str(self)),
                               self.extra_info)


class FlightWriteSizeExceededError(ArrowInvalid):
    """A write operation exceeded the client-configured limit."""

    def __init__(self, message, limit, actual):
        super().__init__(message)
        self.limit = limit
        self.actual = actual


cdef class Action(_Weakrefable):
    """An action executable on a Flight service."""
    cdef:
        CAction action

    def __init__(self, action_type, buf):
        """Create an action from a type and a buffer.

        Parameters
        ----------
        action_type : bytes or str
        buf : Buffer or bytes-like object
        """
        self.action.type = tobytes(action_type)
        self.action.body = pyarrow_unwrap_buffer(as_buffer(buf))

    @property
    def type(self):
        """The action type."""
        return frombytes(self.action.type)

    @property
    def body(self):
        """The action body (arguments for the action)."""
        return pyarrow_wrap_buffer(self.action.body)

    @staticmethod
    cdef CAction unwrap(action) except *:
        if not isinstance(action, Action):
            raise TypeError("Must provide Action, not '{}'".format(
                type(action)))
        return (<Action> action).action

    def serialize(self):
        """Get the wire-format representation of this type.

        Useful when interoperating with non-Flight systems (e.g. REST
        services) that may want to return Flight types.

        """
        return GetResultValue(self.action.SerializeToString())

    @classmethod
    def deserialize(cls, serialized):
        """Parse the wire-format representation of this type.

        Useful when interoperating with non-Flight systems (e.g. REST
        services) that may want to return Flight types.

        """
        cdef Action action = Action.__new__(Action)
        action.action = GetResultValue(
            CAction.Deserialize(tobytes(serialized)))
        return action

    def __eq__(self, Action other):
        return self.action == other.action

    def __repr__(self):
        return (f"<pyarrow.flight.Action type={self.type!r} "
                f"body=({self.body.size} bytes)>")


_ActionType = collections.namedtuple('_ActionType', ['type', 'description'])


class ActionType(_ActionType):
    """A type of action that is executable on a Flight service."""

    def make_action(self, buf):
        """Create an Action with this type.

        Parameters
        ----------
        buf : obj
            An Arrow buffer or Python bytes or bytes-like object.
        """
        return Action(self.type, buf)


cdef class Result(_Weakrefable):
    """A result from executing an Action."""
    cdef:
        unique_ptr[CFlightResult] result

    def __init__(self, buf):
        """Create a new result.

        Parameters
        ----------
        buf : Buffer or bytes-like object
Loading ...