Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / flight.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>
#include <string>
#include <vector>

#include "arrow/flight/api.h"
#include "arrow/ipc/dictionary.h"
#include "arrow/python/common.h"

#if defined(_WIN32) || defined(__CYGWIN__)  // Windows
#  if defined(_MSC_VER)
#    pragma warning(disable : 4251)
#  else
#    pragma GCC diagnostic ignored "-Wattributes"
#  endif

#  ifdef ARROW_PYTHON_STATIC
#    define ARROW_PYFLIGHT_EXPORT
#  elif defined(ARROW_PYFLIGHT_EXPORTING)
#    define ARROW_PYFLIGHT_EXPORT __declspec(dllexport)
#  else
#    define ARROW_PYFLIGHT_EXPORT __declspec(dllimport)
#  endif

#else  // Not Windows
#  ifndef ARROW_PYFLIGHT_EXPORT
#    define ARROW_PYFLIGHT_EXPORT __attribute__((visibility("default")))
#  endif
#endif  // Non-Windows

namespace arrow {

namespace py {

namespace flight {

ARROW_PYFLIGHT_EXPORT
extern const char* kPyServerMiddlewareName;

/// \brief A table of function pointers for calling from C++ into
/// Python.
class ARROW_PYFLIGHT_EXPORT PyFlightServerVtable {
 public:
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       const arrow::flight::Criteria*,
                       std::unique_ptr<arrow::flight::FlightListing>*)>
      list_flights;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       const arrow::flight::FlightDescriptor&,
                       std::unique_ptr<arrow::flight::FlightInfo>*)>
      get_flight_info;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       const arrow::flight::FlightDescriptor&,
                       std::unique_ptr<arrow::flight::SchemaResult>*)>
      get_schema;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       const arrow::flight::Ticket&,
                       std::unique_ptr<arrow::flight::FlightDataStream>*)>
      do_get;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       std::unique_ptr<arrow::flight::FlightMessageReader>,
                       std::unique_ptr<arrow::flight::FlightMetadataWriter>)>
      do_put;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       std::unique_ptr<arrow::flight::FlightMessageReader>,
                       std::unique_ptr<arrow::flight::FlightMessageWriter>)>
      do_exchange;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       const arrow::flight::Action&,
                       std::unique_ptr<arrow::flight::ResultStream>*)>
      do_action;
  std::function<Status(PyObject*, const arrow::flight::ServerCallContext&,
                       std::vector<arrow::flight::ActionType>*)>
      list_actions;
};

class ARROW_PYFLIGHT_EXPORT PyServerAuthHandlerVtable {
 public:
  std::function<Status(PyObject*, arrow::flight::ServerAuthSender*,
                       arrow::flight::ServerAuthReader*)>
      authenticate;
  std::function<Status(PyObject*, const std::string&, std::string*)> is_valid;
};

class ARROW_PYFLIGHT_EXPORT PyClientAuthHandlerVtable {
 public:
  std::function<Status(PyObject*, arrow::flight::ClientAuthSender*,
                       arrow::flight::ClientAuthReader*)>
      authenticate;
  std::function<Status(PyObject*, std::string*)> get_token;
};

/// \brief A helper to implement an auth mechanism in Python.
class ARROW_PYFLIGHT_EXPORT PyServerAuthHandler
    : public arrow::flight::ServerAuthHandler {
 public:
  explicit PyServerAuthHandler(PyObject* handler,
                               const PyServerAuthHandlerVtable& vtable);
  Status Authenticate(arrow::flight::ServerAuthSender* outgoing,
                      arrow::flight::ServerAuthReader* incoming) override;
  Status IsValid(const std::string& token, std::string* peer_identity) override;

 private:
  OwnedRefNoGIL handler_;
  PyServerAuthHandlerVtable vtable_;
};

/// \brief A helper to implement an auth mechanism in Python.
class ARROW_PYFLIGHT_EXPORT PyClientAuthHandler
    : public arrow::flight::ClientAuthHandler {
 public:
  explicit PyClientAuthHandler(PyObject* handler,
                               const PyClientAuthHandlerVtable& vtable);
  Status Authenticate(arrow::flight::ClientAuthSender* outgoing,
                      arrow::flight::ClientAuthReader* incoming) override;
  Status GetToken(std::string* token) override;

 private:
  OwnedRefNoGIL handler_;
  PyClientAuthHandlerVtable vtable_;
};

class ARROW_PYFLIGHT_EXPORT PyFlightServer : public arrow::flight::FlightServerBase {
 public:
  explicit PyFlightServer(PyObject* server, const PyFlightServerVtable& vtable);

  // Like Serve(), but set up signals and invoke Python signal handlers
  // if necessary.  This function may return with a Python exception set.
  Status ServeWithSignals();

  Status ListFlights(const arrow::flight::ServerCallContext& context,
                     const arrow::flight::Criteria* criteria,
                     std::unique_ptr<arrow::flight::FlightListing>* listings) override;
  Status GetFlightInfo(const arrow::flight::ServerCallContext& context,
                       const arrow::flight::FlightDescriptor& request,
                       std::unique_ptr<arrow::flight::FlightInfo>* info) override;
  Status GetSchema(const arrow::flight::ServerCallContext& context,
                   const arrow::flight::FlightDescriptor& request,
                   std::unique_ptr<arrow::flight::SchemaResult>* result) override;
  Status DoGet(const arrow::flight::ServerCallContext& context,
               const arrow::flight::Ticket& request,
               std::unique_ptr<arrow::flight::FlightDataStream>* stream) override;
  Status DoPut(const arrow::flight::ServerCallContext& context,
               std::unique_ptr<arrow::flight::FlightMessageReader> reader,
               std::unique_ptr<arrow::flight::FlightMetadataWriter> writer) override;
  Status DoExchange(const arrow::flight::ServerCallContext& context,
                    std::unique_ptr<arrow::flight::FlightMessageReader> reader,
                    std::unique_ptr<arrow::flight::FlightMessageWriter> writer) override;
  Status DoAction(const arrow::flight::ServerCallContext& context,
                  const arrow::flight::Action& action,
                  std::unique_ptr<arrow::flight::ResultStream>* result) override;
  Status ListActions(const arrow::flight::ServerCallContext& context,
                     std::vector<arrow::flight::ActionType>* actions) override;

 private:
  OwnedRefNoGIL server_;
  PyFlightServerVtable vtable_;
};

/// \brief A callback that obtains the next result from a Flight action.
typedef std::function<Status(PyObject*, std::unique_ptr<arrow::flight::Result>*)>
    PyFlightResultStreamCallback;

/// \brief A ResultStream built around a Python callback.
class ARROW_PYFLIGHT_EXPORT PyFlightResultStream : public arrow::flight::ResultStream {
 public:
  /// \brief Construct a FlightResultStream from a Python object and callback.
  /// Must only be called while holding the GIL.
  explicit PyFlightResultStream(PyObject* generator,
                                PyFlightResultStreamCallback callback);
  arrow::Result<std::unique_ptr<arrow::flight::Result>> Next() override;

 private:
  OwnedRefNoGIL generator_;
  PyFlightResultStreamCallback callback_;
};

/// \brief A wrapper around a FlightDataStream that keeps alive a
/// Python object backing it.
class ARROW_PYFLIGHT_EXPORT PyFlightDataStream : public arrow::flight::FlightDataStream {
 public:
  /// \brief Construct a FlightDataStream from a Python object and underlying stream.
  /// Must only be called while holding the GIL.
  explicit PyFlightDataStream(PyObject* data_source,
                              std::unique_ptr<arrow::flight::FlightDataStream> stream);

  std::shared_ptr<Schema> schema() override;
  arrow::Result<arrow::flight::FlightPayload> GetSchemaPayload() override;
  arrow::Result<arrow::flight::FlightPayload> Next() override;

 private:
  OwnedRefNoGIL data_source_;
  std::unique_ptr<arrow::flight::FlightDataStream> stream_;
};

class ARROW_PYFLIGHT_EXPORT PyServerMiddlewareFactory
    : public arrow::flight::ServerMiddlewareFactory {
 public:
  /// \brief A callback to create the middleware instance in Python
  typedef std::function<Status(
      PyObject*, const arrow::flight::CallInfo& info,
      const arrow::flight::CallHeaders& incoming_headers,
      std::shared_ptr<arrow::flight::ServerMiddleware>* middleware)>
      StartCallCallback;

  /// \brief Must only be called while holding the GIL.
  explicit PyServerMiddlewareFactory(PyObject* factory, StartCallCallback start_call);

  Status StartCall(const arrow::flight::CallInfo& info,
                   const arrow::flight::CallHeaders& incoming_headers,
                   std::shared_ptr<arrow::flight::ServerMiddleware>* middleware) override;

 private:
  OwnedRefNoGIL factory_;
  StartCallCallback start_call_;
};

class ARROW_PYFLIGHT_EXPORT PyServerMiddleware : public arrow::flight::ServerMiddleware {
 public:
  typedef std::function<Status(PyObject*,
                               arrow::flight::AddCallHeaders* outgoing_headers)>
      SendingHeadersCallback;
  typedef std::function<Status(PyObject*, const Status& status)> CallCompletedCallback;

  struct Vtable {
    SendingHeadersCallback sending_headers;
    CallCompletedCallback call_completed;
  };

  /// \brief Must only be called while holding the GIL.
  explicit PyServerMiddleware(PyObject* middleware, Vtable vtable);

  void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) override;
  void CallCompleted(const Status& status) override;
  std::string name() const override;
  /// \brief Get the underlying Python object.
  PyObject* py_object() const;

 private:
  OwnedRefNoGIL middleware_;
  Vtable vtable_;
};

class ARROW_PYFLIGHT_EXPORT PyClientMiddlewareFactory
    : public arrow::flight::ClientMiddlewareFactory {
 public:
  /// \brief A callback to create the middleware instance in Python
  typedef std::function<Status(
      PyObject*, const arrow::flight::CallInfo& info,
      std::unique_ptr<arrow::flight::ClientMiddleware>* middleware)>
      StartCallCallback;

  /// \brief Must only be called while holding the GIL.
  explicit PyClientMiddlewareFactory(PyObject* factory, StartCallCallback start_call);

  void StartCall(const arrow::flight::CallInfo& info,
                 std::unique_ptr<arrow::flight::ClientMiddleware>* middleware) override;

 private:
  OwnedRefNoGIL factory_;
  StartCallCallback start_call_;
};

class ARROW_PYFLIGHT_EXPORT PyClientMiddleware : public arrow::flight::ClientMiddleware {
 public:
  typedef std::function<Status(PyObject*,
                               arrow::flight::AddCallHeaders* outgoing_headers)>
      SendingHeadersCallback;
  typedef std::function<Status(PyObject*,
                               const arrow::flight::CallHeaders& incoming_headers)>
      ReceivedHeadersCallback;
  typedef std::function<Status(PyObject*, const Status& status)> CallCompletedCallback;

  struct Vtable {
    SendingHeadersCallback sending_headers;
    ReceivedHeadersCallback received_headers;
    CallCompletedCallback call_completed;
  };

  /// \brief Must only be called while holding the GIL.
  explicit PyClientMiddleware(PyObject* factory, Vtable vtable);

  void SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) override;
  void ReceivedHeaders(const arrow::flight::CallHeaders& incoming_headers) override;
  void CallCompleted(const Status& status) override;

 private:
  OwnedRefNoGIL middleware_;
  Vtable vtable_;
};

/// \brief A callback that obtains the next payload from a Flight result stream.
typedef std::function<Status(PyObject*, arrow::flight::FlightPayload*)>
    PyGeneratorFlightDataStreamCallback;

/// \brief A FlightDataStream built around a Python callback.
class ARROW_PYFLIGHT_EXPORT PyGeneratorFlightDataStream
    : public arrow::flight::FlightDataStream {
 public:
  /// \brief Construct a FlightDataStream from a Python object and underlying stream.
  /// Must only be called while holding the GIL.
  explicit PyGeneratorFlightDataStream(PyObject* generator,
                                       std::shared_ptr<arrow::Schema> schema,
                                       PyGeneratorFlightDataStreamCallback callback,
                                       const ipc::IpcWriteOptions& options);
  std::shared_ptr<Schema> schema() override;
  arrow::Result<arrow::flight::FlightPayload> GetSchemaPayload() override;
  arrow::Result<arrow::flight::FlightPayload> Next() override;

 private:
  OwnedRefNoGIL generator_;
  std::shared_ptr<arrow::Schema> schema_;
  ipc::DictionaryFieldMapper mapper_;
  ipc::IpcWriteOptions options_;
  PyGeneratorFlightDataStreamCallback callback_;
};

ARROW_PYFLIGHT_EXPORT
Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
                        const arrow::flight::FlightDescriptor& descriptor,
                        const std::vector<arrow::flight::FlightEndpoint>& endpoints,
                        int64_t total_records, int64_t total_bytes, bool ordered,
                        const std::string& app_metadata,
                        std::unique_ptr<arrow::flight::FlightInfo>* out);

/// \brief Create a SchemaResult from schema.
ARROW_PYFLIGHT_EXPORT
Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema,
                          std::unique_ptr<arrow::flight::SchemaResult>* out);

}  // namespace flight
}  // namespace py
}  // namespace arrow