Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / deserialize.cc

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/python/deserialize.h"

#include "arrow/python/numpy_interop.h"

#include <cstdint>
#include <memory>
#include <string>
#include <utility>
#include <vector>

#include <numpy/arrayobject.h>
#include <numpy/arrayscalars.h>

#include "arrow/array.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/options.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/util.h"
#include "arrow/ipc/writer.h"
#include "arrow/table.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/logging.h"
#include "arrow/util/value_parsing.h"

#include "arrow/python/common.h"
#include "arrow/python/datetime.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/pyarrow.h"
#include "arrow/python/serialize.h"
#include "arrow/python/vendored/pythoncapi_compat.h"

namespace arrow {

using internal::checked_cast;
using internal::ParseValue;

namespace py {

Status CallDeserializeCallback(PyObject* context, PyObject* value,
                               PyObject** deserialized_object);

Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
                        int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out);

Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                       PyObject** out);

Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
                      int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                      PyObject** out);

Status DeserializeDict(PyObject* context, const Array& array, int64_t start_idx,
                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                       PyObject** out) {
  const auto& data = checked_cast<const StructArray&>(array);
  OwnedRef keys, vals;
  OwnedRef result(PyDict_New());
  RETURN_IF_PYERROR();

  DCHECK_EQ(2, data.num_fields());

  RETURN_NOT_OK(DeserializeList(context, *data.field(0), start_idx, stop_idx, base, blobs,
                                keys.ref()));
  RETURN_NOT_OK(DeserializeList(context, *data.field(1), start_idx, stop_idx, base, blobs,
                                vals.ref()));
  for (int64_t i = start_idx; i < stop_idx; ++i) {
    // PyDict_SetItem behaves differently from PyList_SetItem and PyTuple_SetItem.
    // The latter two steal references whereas PyDict_SetItem does not. So we need
    // to make sure the reference count is decremented by letting the OwnedRef
    // go out of scope at the end.
    PyObject* key = PyList_GetItemRef(keys.obj(), i - start_idx);
    RETURN_IF_PYERROR();
    OwnedRef keyref(key);
    PyObject* val = PyList_GetItemRef(vals.obj(), i - start_idx);
    RETURN_IF_PYERROR();
    OwnedRef valref(val);
    int ret = PyDict_SetItem(result.obj(), key, val);
    if (ret != 0) {
      return ConvertPyError();
    }
  }
  static PyObject* py_type = PyUnicode_FromString("_pytype_");
  if (PyDict_Contains(result.obj(), py_type)) {
    RETURN_NOT_OK(CallDeserializeCallback(context, result.obj(), out));
  } else {
    *out = result.detach();
  }
  return Status::OK();
}

Status DeserializeArray(int32_t index, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out) {
  RETURN_NOT_OK(py::TensorToNdarray(blobs.ndarrays[index], base, out));
  // Mark the array as immutable
  OwnedRef flags(PyObject_GetAttrString(*out, "flags"));
  if (flags.obj() == NULL) {
    return ConvertPyError();
  }
  if (PyObject_SetAttrString(flags.obj(), "writeable", Py_False) < 0) {
    return ConvertPyError();
  }
  return Status::OK();
}

Status GetValue(PyObject* context, const Array& arr, int64_t index, int8_t type,
                PyObject* base, const SerializedPyObject& blobs, PyObject** result) {
  switch (type) {
    case PythonType::NONE:
      Py_INCREF(Py_None);
      *result = Py_None;
      return Status::OK();
    case PythonType::BOOL:
      *result = PyBool_FromLong(checked_cast<const BooleanArray&>(arr).Value(index));
      return Status::OK();
    case PythonType::PY2INT:
    case PythonType::INT: {
      *result = PyLong_FromSsize_t(checked_cast<const Int64Array&>(arr).Value(index));
      return Status::OK();
    }
    case PythonType::BYTES: {
      auto view = checked_cast<const BinaryArray&>(arr).GetView(index);
      *result = PyBytes_FromStringAndSize(view.data(), view.length());
      return CheckPyError();
    }
    case PythonType::STRING: {
      auto view = checked_cast<const StringArray&>(arr).GetView(index);
      *result = PyUnicode_FromStringAndSize(view.data(), view.length());
      return CheckPyError();
    }
    case PythonType::HALF_FLOAT: {
      *result = PyHalf_FromHalf(checked_cast<const HalfFloatArray&>(arr).Value(index));
      RETURN_IF_PYERROR();
      return Status::OK();
    }
    case PythonType::FLOAT:
      *result = PyFloat_FromDouble(checked_cast<const FloatArray&>(arr).Value(index));
      return Status::OK();
    case PythonType::DOUBLE:
      *result = PyFloat_FromDouble(checked_cast<const DoubleArray&>(arr).Value(index));
      return Status::OK();
    case PythonType::DATE64: {
      RETURN_NOT_OK(internal::PyDateTime_from_int(
          checked_cast<const Date64Array&>(arr).Value(index), TimeUnit::MICRO, result));
      RETURN_IF_PYERROR();
      return Status::OK();
    }
    case PythonType::LIST: {
      const auto& l = checked_cast<const ListArray&>(arr);
      return DeserializeList(context, *l.values(), l.value_offset(index),
                             l.value_offset(index + 1), base, blobs, result);
    }
    case PythonType::DICT: {
      const auto& l = checked_cast<const ListArray&>(arr);
      return DeserializeDict(context, *l.values(), l.value_offset(index),
                             l.value_offset(index + 1), base, blobs, result);
    }
    case PythonType::TUPLE: {
      const auto& l = checked_cast<const ListArray&>(arr);
      return DeserializeTuple(context, *l.values(), l.value_offset(index),
                              l.value_offset(index + 1), base, blobs, result);
    }
    case PythonType::SET: {
      const auto& l = checked_cast<const ListArray&>(arr);
      return DeserializeSet(context, *l.values(), l.value_offset(index),
                            l.value_offset(index + 1), base, blobs, result);
    }
    case PythonType::TENSOR: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      *result = wrap_tensor(blobs.tensors[ref]);
      return Status::OK();
    }
    case PythonType::SPARSECOOTENSOR: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      const std::shared_ptr<SparseCOOTensor>& sparse_coo_tensor =
          arrow::internal::checked_pointer_cast<SparseCOOTensor>(
              blobs.sparse_tensors[ref]);
      *result = wrap_sparse_coo_tensor(sparse_coo_tensor);
      return Status::OK();
    }
    case PythonType::SPARSECSRMATRIX: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      const std::shared_ptr<SparseCSRMatrix>& sparse_csr_matrix =
          arrow::internal::checked_pointer_cast<SparseCSRMatrix>(
              blobs.sparse_tensors[ref]);
      *result = wrap_sparse_csr_matrix(sparse_csr_matrix);
      return Status::OK();
    }
    case PythonType::SPARSECSCMATRIX: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      const std::shared_ptr<SparseCSCMatrix>& sparse_csc_matrix =
          arrow::internal::checked_pointer_cast<SparseCSCMatrix>(
              blobs.sparse_tensors[ref]);
      *result = wrap_sparse_csc_matrix(sparse_csc_matrix);
      return Status::OK();
    }
    case PythonType::SPARSECSFTENSOR: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      const std::shared_ptr<SparseCSFTensor>& sparse_csf_tensor =
          arrow::internal::checked_pointer_cast<SparseCSFTensor>(
              blobs.sparse_tensors[ref]);
      *result = wrap_sparse_csf_tensor(sparse_csf_tensor);
      return Status::OK();
    }
    case PythonType::NDARRAY: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      return DeserializeArray(ref, base, blobs, result);
    }
    case PythonType::BUFFER: {
      int32_t ref = checked_cast<const Int32Array&>(arr).Value(index);
      *result = wrap_buffer(blobs.buffers[ref]);
      return Status::OK();
    }
    default: {
      ARROW_CHECK(false) << "union tag " << type << "' not recognized";
    }
  }
  return Status::OK();
}

Status GetPythonTypes(const UnionArray& data, std::vector<int8_t>* result) {
  ARROW_CHECK(result != nullptr);
  auto type = data.type();
  for (int i = 0; i < type->num_fields(); ++i) {
    int8_t tag = 0;
    const std::string& data = type->field(i)->name();
    if (!ParseValue<Int8Type>(data.c_str(), data.size(), &tag)) {
      return Status::SerializationError("Cannot convert string: \"",
                                        type->field(i)->name(), "\" to int8_t");
    }
    result->push_back(tag);
  }
  return Status::OK();
}

template <typename CreateSequenceFn, typename SetItemFn>
Status DeserializeSequence(PyObject* context, const Array& array, int64_t start_idx,
                           int64_t stop_idx, PyObject* base,
                           const SerializedPyObject& blobs,
                           CreateSequenceFn&& create_sequence, SetItemFn&& set_item,
                           PyObject** out) {
  const auto& data = checked_cast<const DenseUnionArray&>(array);
  OwnedRef result(create_sequence(stop_idx - start_idx));
  RETURN_IF_PYERROR();
  const int8_t* type_codes = data.raw_type_codes();
  const int32_t* value_offsets = data.raw_value_offsets();
  std::vector<int8_t> python_types;
  RETURN_NOT_OK(GetPythonTypes(data, &python_types));
  for (int64_t i = start_idx; i < stop_idx; ++i) {
    const int64_t offset = value_offsets[i];
    const uint8_t type = type_codes[i];
    PyObject* value;
    RETURN_NOT_OK(GetValue(context, *data.field(type), offset, python_types[type], base,
                           blobs, &value));
    RETURN_NOT_OK(set_item(result.obj(), i - start_idx, value));
  }
  *out = result.detach();
  return Status::OK();
}

Status DeserializeList(PyObject* context, const Array& array, int64_t start_idx,
                       int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                       PyObject** out) {
  return DeserializeSequence(
      context, array, start_idx, stop_idx, base, blobs,
      [](int64_t size) { return PyList_New(size); },
      [](PyObject* seq, int64_t index, PyObject* item) {
        PyList_SET_ITEM(seq, index, item);
        return Status::OK();
      },
      out);
}

Status DeserializeTuple(PyObject* context, const Array& array, int64_t start_idx,
                        int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                        PyObject** out) {
  return DeserializeSequence(
      context, array, start_idx, stop_idx, base, blobs,
      [](int64_t size) { return PyTuple_New(size); },
      [](PyObject* seq, int64_t index, PyObject* item) {
        PyTuple_SET_ITEM(seq, index, item);
        return Status::OK();
      },
      out);
}

Status DeserializeSet(PyObject* context, const Array& array, int64_t start_idx,
                      int64_t stop_idx, PyObject* base, const SerializedPyObject& blobs,
                      PyObject** out) {
  return DeserializeSequence(
      context, array, start_idx, stop_idx, base, blobs,
      [](int64_t size) { return PySet_New(nullptr); },
      [](PyObject* seq, int64_t index, PyObject* item) {
        int err = PySet_Add(seq, item);
        Py_DECREF(item);
        if (err < 0) {
          RETURN_IF_PYERROR();
        }
        return Status::OK();
      },
      out);
}

Status ReadSerializedObject(io::RandomAccessFile* src, SerializedPyObject* out) {
  int32_t num_tensors;
  int32_t num_sparse_tensors;
  int32_t num_ndarrays;
  int32_t num_buffers;

  // Read number of tensors
  RETURN_NOT_OK(src->Read(sizeof(int32_t), reinterpret_cast<uint8_t*>(&num_tensors)));
  RETURN_NOT_OK(
      src->Read(sizeof(int32_t), reinterpret_cast<uint8_t*>(&num_sparse_tensors)));
  RETURN_NOT_OK(src->Read(sizeof(int32_t), reinterpret_cast<uint8_t*>(&num_ndarrays)));
  RETURN_NOT_OK(src->Read(sizeof(int32_t), reinterpret_cast<uint8_t*>(&num_buffers)));

  // Align stream to 8-byte offset
  RETURN_NOT_OK(ipc::AlignStream(src, ipc::kArrowIpcAlignment));
  std::shared_ptr<RecordBatchReader> reader;
  ARROW_ASSIGN_OR_RAISE(reader, ipc::RecordBatchStreamReader::Open(src));
  RETURN_NOT_OK(reader->ReadNext(&out->batch));

  /// Skip EOS marker
  RETURN_NOT_OK(src->Advance(4));

  /// Align stream so tensor bodies are 64-byte aligned
  RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));

  for (int i = 0; i < num_tensors; ++i) {
    std::shared_ptr<Tensor> tensor;
    ARROW_ASSIGN_OR_RAISE(tensor, ipc::ReadTensor(src));
    RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
    out->tensors.push_back(tensor);
  }

  for (int i = 0; i < num_sparse_tensors; ++i) {
    std::shared_ptr<SparseTensor> sparse_tensor;
    ARROW_ASSIGN_OR_RAISE(sparse_tensor, ipc::ReadSparseTensor(src));
    RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
    out->sparse_tensors.push_back(sparse_tensor);
  }

  for (int i = 0; i < num_ndarrays; ++i) {
    std::shared_ptr<Tensor> ndarray;
    ARROW_ASSIGN_OR_RAISE(ndarray, ipc::ReadTensor(src));
    RETURN_NOT_OK(ipc::AlignStream(src, ipc::kTensorAlignment));
    out->ndarrays.push_back(ndarray);
  }

  ARROW_ASSIGN_OR_RAISE(int64_t offset, src->Tell());
  for (int i = 0; i < num_buffers; ++i) {
    int64_t size;
    RETURN_NOT_OK(src->ReadAt(offset, sizeof(int64_t), &size));
    offset += sizeof(int64_t);
    ARROW_ASSIGN_OR_RAISE(auto buffer, src->ReadAt(offset, size));
    out->buffers.push_back(buffer);
    offset += size;
  }

  return Status::OK();
}

Status DeserializeObject(PyObject* context, const SerializedPyObject& obj, PyObject* base,
                         PyObject** out) {
  PyAcquireGIL lock;
  return DeserializeList(context, *obj.batch->column(0), 0, obj.batch->num_rows(), base,
                         obj, out);
}

Status GetSerializedFromComponents(int num_tensors,
                                   const SparseTensorCounts& num_sparse_tensors,
                                   int num_ndarrays, int num_buffers, PyObject* data,
                                   SerializedPyObject* out) {
  PyAcquireGIL gil;
  const Py_ssize_t data_length = PyList_Size(data);
  RETURN_IF_PYERROR();

  const Py_ssize_t expected_data_length = 1 + num_tensors * 2 +
                                          num_sparse_tensors.num_total_buffers() +
                                          num_ndarrays * 2 + num_buffers;
  if (data_length != expected_data_length) {
    return Status::Invalid("Invalid number of buffers in data");
  }

  auto GetBuffer = [&data](Py_ssize_t index, std::shared_ptr<Buffer>* out) {
    ARROW_CHECK_LE(index, PyList_Size(data));
    PyObject* py_buf = PyList_GetItemRef(data, index);
    RETURN_IF_PYERROR();
    OwnedRef py_buf_ref(py_buf);
    return unwrap_buffer(py_buf).Value(out);
  };

  Py_ssize_t buffer_index = 0;

  // Read the union batch describing object structure
  {
    std::shared_ptr<Buffer> data_buffer;
    RETURN_NOT_OK(GetBuffer(buffer_index++, &data_buffer));
    gil.release();
    io::BufferReader buf_reader(data_buffer);
    std::shared_ptr<RecordBatchReader> reader;
    ARROW_ASSIGN_OR_RAISE(reader, ipc::RecordBatchStreamReader::Open(&buf_reader));
    RETURN_NOT_OK(reader->ReadNext(&out->batch));
    gil.acquire();
  }

  // Zero-copy reconstruct tensors
  for (int i = 0; i < num_tensors; ++i) {
    std::shared_ptr<Buffer> metadata;
    std::shared_ptr<Buffer> body;
    std::shared_ptr<Tensor> tensor;
    RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
    RETURN_NOT_OK(GetBuffer(buffer_index++, &body));

    ipc::Message message(metadata, body);

    ARROW_ASSIGN_OR_RAISE(tensor, ipc::ReadTensor(message));
    out->tensors.emplace_back(std::move(tensor));
  }

  // Zero-copy reconstruct sparse tensors
  for (int i = 0, n = num_sparse_tensors.num_total_tensors(); i < n; ++i) {
    ipc::IpcPayload payload;
    RETURN_NOT_OK(GetBuffer(buffer_index++, &payload.metadata));

    ARROW_ASSIGN_OR_RAISE(
        size_t num_bodies,
        ipc::internal::ReadSparseTensorBodyBufferCount(*payload.metadata));

    payload.body_buffers.reserve(num_bodies);
    for (size_t i = 0; i < num_bodies; ++i) {
      std::shared_ptr<Buffer> body;
      RETURN_NOT_OK(GetBuffer(buffer_index++, &body));
      payload.body_buffers.emplace_back(body);
    }

    std::shared_ptr<SparseTensor> sparse_tensor;
    ARROW_ASSIGN_OR_RAISE(sparse_tensor, ipc::internal::ReadSparseTensorPayload(payload));
    out->sparse_tensors.emplace_back(std::move(sparse_tensor));
  }

  // Zero-copy reconstruct tensors for numpy ndarrays
  for (int i = 0; i < num_ndarrays; ++i) {
    std::shared_ptr<Buffer> metadata;
    std::shared_ptr<Buffer> body;
    std::shared_ptr<Tensor> tensor;
    RETURN_NOT_OK(GetBuffer(buffer_index++, &metadata));
    RETURN_NOT_OK(GetBuffer(buffer_index++, &body));

    ipc::Message message(metadata, body);

    ARROW_ASSIGN_OR_RAISE(tensor, ipc::ReadTensor(message));
    out->ndarrays.emplace_back(std::move(tensor));
  }

  // Unwrap and append buffers
  for (int i = 0; i < num_buffers; ++i) {
    std::shared_ptr<Buffer> buffer;
    RETURN_NOT_OK(GetBuffer(buffer_index++, &buffer));
    out->buffers.emplace_back(std::move(buffer));
  }

  return Status::OK();
}

Status DeserializeNdarray(const SerializedPyObject& object,
                          std::shared_ptr<Tensor>* out) {
  if (object.ndarrays.size() != 1) {
    return Status::Invalid("Object is not an Ndarray");
  }
  *out = object.ndarrays[0];
  return Status::OK();
}

Status NdarrayFromBuffer(std::shared_ptr<Buffer> src, std::shared_ptr<Tensor>* out) {
  io::BufferReader reader(src);
  SerializedPyObject object;
  RETURN_NOT_OK(ReadSerializedObject(&reader, &object));
  return DeserializeNdarray(object, out);
}

}  // namespace py
}  // namespace arrow