Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / flight.cc

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include <signal.h>
#include <utility>

#include "arrow/python/flight.h"
#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"

using arrow::flight::FlightPayload;

namespace arrow {
namespace py {
namespace flight {

const char* kPyServerMiddlewareName = "arrow.py_server_middleware";

PyServerAuthHandler::PyServerAuthHandler(PyObject* handler,
                                         const PyServerAuthHandlerVtable& vtable)
    : vtable_(vtable) {
  Py_INCREF(handler);
  handler_.reset(handler);
}

Status PyServerAuthHandler::Authenticate(arrow::flight::ServerAuthSender* outgoing,
                                         arrow::flight::ServerAuthReader* incoming) {
  return SafeCallIntoPython([=] {
    const Status status = vtable_.authenticate(handler_.obj(), outgoing, incoming);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyServerAuthHandler::IsValid(const std::string& token,
                                    std::string* peer_identity) {
  return SafeCallIntoPython([=] {
    const Status status = vtable_.is_valid(handler_.obj(), token, peer_identity);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

PyClientAuthHandler::PyClientAuthHandler(PyObject* handler,
                                         const PyClientAuthHandlerVtable& vtable)
    : vtable_(vtable) {
  Py_INCREF(handler);
  handler_.reset(handler);
}

Status PyClientAuthHandler::Authenticate(arrow::flight::ClientAuthSender* outgoing,
                                         arrow::flight::ClientAuthReader* incoming) {
  return SafeCallIntoPython([=] {
    const Status status = vtable_.authenticate(handler_.obj(), outgoing, incoming);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyClientAuthHandler::GetToken(std::string* token) {
  return SafeCallIntoPython([=] {
    const Status status = vtable_.get_token(handler_.obj(), token);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

PyFlightServer::PyFlightServer(PyObject* server, const PyFlightServerVtable& vtable)
    : vtable_(vtable) {
  Py_INCREF(server);
  server_.reset(server);
}

Status PyFlightServer::ListFlights(
    const arrow::flight::ServerCallContext& context,
    const arrow::flight::Criteria* criteria,
    std::unique_ptr<arrow::flight::FlightListing>* listings) {
  return SafeCallIntoPython([&] {
    const Status status =
        vtable_.list_flights(server_.obj(), context, criteria, listings);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::GetFlightInfo(const arrow::flight::ServerCallContext& context,
                                     const arrow::flight::FlightDescriptor& request,
                                     std::unique_ptr<arrow::flight::FlightInfo>* info) {
  return SafeCallIntoPython([&] {
    const Status status = vtable_.get_flight_info(server_.obj(), context, request, info);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::GetSchema(const arrow::flight::ServerCallContext& context,
                                 const arrow::flight::FlightDescriptor& request,
                                 std::unique_ptr<arrow::flight::SchemaResult>* result) {
  return SafeCallIntoPython([&] {
    const Status status = vtable_.get_schema(server_.obj(), context, request, result);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::DoGet(const arrow::flight::ServerCallContext& context,
                             const arrow::flight::Ticket& request,
                             std::unique_ptr<arrow::flight::FlightDataStream>* stream) {
  return SafeCallIntoPython([&] {
    const Status status = vtable_.do_get(server_.obj(), context, request, stream);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::DoPut(
    const arrow::flight::ServerCallContext& context,
    std::unique_ptr<arrow::flight::FlightMessageReader> reader,
    std::unique_ptr<arrow::flight::FlightMetadataWriter> writer) {
  return SafeCallIntoPython([&] {
    const Status status =
        vtable_.do_put(server_.obj(), context, std::move(reader), std::move(writer));
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::DoExchange(
    const arrow::flight::ServerCallContext& context,
    std::unique_ptr<arrow::flight::FlightMessageReader> reader,
    std::unique_ptr<arrow::flight::FlightMessageWriter> writer) {
  return SafeCallIntoPython([&] {
    const Status status =
        vtable_.do_exchange(server_.obj(), context, std::move(reader), std::move(writer));
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::DoAction(const arrow::flight::ServerCallContext& context,
                                const arrow::flight::Action& action,
                                std::unique_ptr<arrow::flight::ResultStream>* result) {
  return SafeCallIntoPython([&] {
    const Status status = vtable_.do_action(server_.obj(), context, action, result);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::ListActions(const arrow::flight::ServerCallContext& context,
                                   std::vector<arrow::flight::ActionType>* actions) {
  return SafeCallIntoPython([&] {
    const Status status = vtable_.list_actions(server_.obj(), context, actions);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

Status PyFlightServer::ServeWithSignals() {
  // Respect the current Python settings, i.e. only interrupt the server if there is
  // an active signal handler for SIGINT and SIGTERM.
  std::vector<int> signals;
  for (const int signum : {SIGINT, SIGTERM}) {
    ARROW_ASSIGN_OR_RAISE(auto handler, ::arrow::internal::GetSignalHandler(signum));
    auto cb = handler.callback();
    if (cb != SIG_DFL && cb != SIG_IGN) {
      signals.push_back(signum);
    }
  }
  RETURN_NOT_OK(SetShutdownOnSignals(signals));

  // Serve until we got told to shutdown or a signal interrupted us
  RETURN_NOT_OK(Serve());
  int signum = GotSignal();
  if (signum != 0) {
    // Issue the signal again with Python's signal handlers restored
    PyAcquireGIL lock;
    raise(signum);
    // XXX Ideally we would loop and serve again if no exception was raised.
    // Unfortunately, gRPC will return immediately if Serve() is called again.
    ARROW_UNUSED(PyErr_CheckSignals());
  }

  return Status::OK();
}

PyFlightResultStream::PyFlightResultStream(PyObject* generator,
                                           PyFlightResultStreamCallback callback)
    : callback_(callback) {
  Py_INCREF(generator);
  generator_.reset(generator);
}

arrow::Result<std::unique_ptr<arrow::flight::Result>> PyFlightResultStream::Next() {
  return SafeCallIntoPython(
      [=]() -> arrow::Result<std::unique_ptr<arrow::flight::Result>> {
        std::unique_ptr<arrow::flight::Result> result;
        const Status status = callback_(generator_.obj(), &result);
        RETURN_NOT_OK(CheckPyError());
        RETURN_NOT_OK(status);
        return result;
      });
}

PyFlightDataStream::PyFlightDataStream(
    PyObject* data_source, std::unique_ptr<arrow::flight::FlightDataStream> stream)
    : stream_(std::move(stream)) {
  Py_INCREF(data_source);
  data_source_.reset(data_source);
}

std::shared_ptr<Schema> PyFlightDataStream::schema() { return stream_->schema(); }

arrow::Result<FlightPayload> PyFlightDataStream::GetSchemaPayload() {
  return stream_->GetSchemaPayload();
}

arrow::Result<FlightPayload> PyFlightDataStream::Next() { return stream_->Next(); }

PyGeneratorFlightDataStream::PyGeneratorFlightDataStream(
    PyObject* generator, std::shared_ptr<arrow::Schema> schema,
    PyGeneratorFlightDataStreamCallback callback, const ipc::IpcWriteOptions& options)
    : schema_(schema), mapper_(*schema_), options_(options), callback_(callback) {
  Py_INCREF(generator);
  generator_.reset(generator);
}

std::shared_ptr<Schema> PyGeneratorFlightDataStream::schema() { return schema_; }

arrow::Result<FlightPayload> PyGeneratorFlightDataStream::GetSchemaPayload() {
  FlightPayload payload;
  RETURN_NOT_OK(ipc::GetSchemaPayload(*schema_, options_, mapper_, &payload.ipc_message));
  return payload;
}

arrow::Result<FlightPayload> PyGeneratorFlightDataStream::Next() {
  return SafeCallIntoPython([=]() -> arrow::Result<FlightPayload> {
    FlightPayload payload;
    const Status status = callback_(generator_.obj(), &payload);
    RETURN_NOT_OK(CheckPyError());
    RETURN_NOT_OK(status);
    return payload;
  });
}

// Flight Server Middleware

PyServerMiddlewareFactory::PyServerMiddlewareFactory(PyObject* factory,
                                                     StartCallCallback start_call)
    : start_call_(start_call) {
  Py_INCREF(factory);
  factory_.reset(factory);
}

Status PyServerMiddlewareFactory::StartCall(
    const arrow::flight::CallInfo& info,
    const arrow::flight::CallHeaders& incoming_headers,
    std::shared_ptr<arrow::flight::ServerMiddleware>* middleware) {
  return SafeCallIntoPython([&] {
    const Status status = start_call_(factory_.obj(), info, incoming_headers, middleware);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });
}

PyServerMiddleware::PyServerMiddleware(PyObject* middleware, Vtable vtable)
    : vtable_(vtable) {
  Py_INCREF(middleware);
  middleware_.reset(middleware);
}

void PyServerMiddleware::SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = vtable_.sending_headers(middleware_.obj(), outgoing_headers);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python server middleware failed in SendingHeaders");
}

void PyServerMiddleware::CallCompleted(const Status& call_status) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = vtable_.call_completed(middleware_.obj(), call_status);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python server middleware failed in CallCompleted");
}

std::string PyServerMiddleware::name() const { return kPyServerMiddlewareName; }

PyObject* PyServerMiddleware::py_object() const { return middleware_.obj(); }

// Flight Client Middleware

PyClientMiddlewareFactory::PyClientMiddlewareFactory(PyObject* factory,
                                                     StartCallCallback start_call)
    : start_call_(start_call) {
  Py_INCREF(factory);
  factory_.reset(factory);
}

void PyClientMiddlewareFactory::StartCall(
    const arrow::flight::CallInfo& info,
    std::unique_ptr<arrow::flight::ClientMiddleware>* middleware) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = start_call_(factory_.obj(), info, middleware);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python client middleware failed in StartCall");
}

PyClientMiddleware::PyClientMiddleware(PyObject* middleware, Vtable vtable)
    : vtable_(vtable) {
  Py_INCREF(middleware);
  middleware_.reset(middleware);
}

void PyClientMiddleware::SendingHeaders(arrow::flight::AddCallHeaders* outgoing_headers) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = vtable_.sending_headers(middleware_.obj(), outgoing_headers);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python client middleware failed in StartCall");
}

void PyClientMiddleware::ReceivedHeaders(
    const arrow::flight::CallHeaders& incoming_headers) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = vtable_.received_headers(middleware_.obj(), incoming_headers);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python client middleware failed in StartCall");
}

void PyClientMiddleware::CallCompleted(const Status& call_status) {
  const Status& status = SafeCallIntoPython([&] {
    const Status status = vtable_.call_completed(middleware_.obj(), call_status);
    RETURN_NOT_OK(CheckPyError());
    return status;
  });

  ARROW_WARN_NOT_OK(status, "Python client middleware failed in StartCall");
}

Status CreateFlightInfo(const std::shared_ptr<arrow::Schema>& schema,
                        const arrow::flight::FlightDescriptor& descriptor,
                        const std::vector<arrow::flight::FlightEndpoint>& endpoints,
                        int64_t total_records, int64_t total_bytes, bool ordered,
                        const std::string& app_metadata,
                        std::unique_ptr<arrow::flight::FlightInfo>* out) {
  ARROW_ASSIGN_OR_RAISE(auto result, arrow::flight::FlightInfo::Make(
                                         *schema, descriptor, endpoints, total_records,
                                         total_bytes, ordered, app_metadata));
  *out = std::unique_ptr<arrow::flight::FlightInfo>(
      new arrow::flight::FlightInfo(std::move(result)));
  return Status::OK();
}

Status CreateSchemaResult(const std::shared_ptr<arrow::Schema>& schema,
                          std::unique_ptr<arrow::flight::SchemaResult>* out) {
  return arrow::flight::SchemaResult::Make(*schema).Value(out);
}

}  // namespace flight
}  // namespace py
}  // namespace arrow