Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / serialize.cc

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/python/serialize.h"
#include "arrow/python/numpy_interop.h"

#include <cstdint>
#include <limits>
#include <memory>
#include <sstream>
#include <string>
#include <vector>

#include <numpy/arrayobject.h>
#include <numpy/arrayscalars.h>

#include "arrow/array.h"
#include "arrow/array/builder_binary.h"
#include "arrow/array/builder_nested.h"
#include "arrow/array/builder_primitive.h"
#include "arrow/array/builder_union.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/ipc/util.h"
#include "arrow/ipc/writer.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/tensor.h"
#include "arrow/util/logging.h"

#include "arrow/python/common.h"
#include "arrow/python/datetime.h"
#include "arrow/python/helpers.h"
#include "arrow/python/iterators.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/platform.h"
#include "arrow/python/pyarrow.h"

constexpr int32_t kMaxRecursionDepth = 100;

namespace arrow {

using internal::checked_cast;

namespace py {

class SequenceBuilder;
class DictBuilder;

Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
              int32_t recursion_depth, SerializedPyObject* blobs_out);

// A Sequence is a heterogeneous collections of elements. It can contain
// scalar Python types, lists, tuples, dictionaries, tensors and sparse tensors.
class SequenceBuilder {
 public:
  explicit SequenceBuilder(MemoryPool* pool = default_memory_pool())
      : pool_(pool),
        types_(::arrow::int8(), pool),
        offsets_(::arrow::int32(), pool),
        type_map_(PythonType::NUM_PYTHON_TYPES, -1) {
    auto null_builder = std::make_shared<NullBuilder>(pool);
    auto initial_ty = dense_union({field("0", null())});
    builder_.reset(new DenseUnionBuilder(pool, {null_builder}, initial_ty));
  }

  // Appending a none to the sequence
  Status AppendNone() { return builder_->AppendNull(); }

  template <typename BuilderType, typename MakeBuilderFn>
  Status CreateAndUpdate(std::shared_ptr<BuilderType>* child_builder, int8_t tag,
                         MakeBuilderFn make_builder) {
    if (!*child_builder) {
      child_builder->reset(make_builder());
      std::ostringstream convert;
      convert.imbue(std::locale::classic());
      convert << static_cast<int>(tag);
      type_map_[tag] = builder_->AppendChild(*child_builder, convert.str());
    }
    return builder_->Append(type_map_[tag]);
  }

  template <typename BuilderType, typename T>
  Status AppendPrimitive(std::shared_ptr<BuilderType>* child_builder, const T val,
                         int8_t tag) {
    RETURN_NOT_OK(
        CreateAndUpdate(child_builder, tag, [this]() { return new BuilderType(pool_); }));
    return (*child_builder)->Append(val);
  }

  // Appending a boolean to the sequence
  Status AppendBool(const bool data) {
    return AppendPrimitive(&bools_, data, PythonType::BOOL);
  }

  // Appending an int64_t to the sequence
  Status AppendInt64(const int64_t data) {
    return AppendPrimitive(&ints_, data, PythonType::INT);
  }

  // Append a list of bytes to the sequence
  Status AppendBytes(const uint8_t* data, int32_t length) {
    RETURN_NOT_OK(CreateAndUpdate(&bytes_, PythonType::BYTES,
                                  [this]() { return new BinaryBuilder(pool_); }));
    return bytes_->Append(data, length);
  }

  // Appending a string to the sequence
  Status AppendString(const char* data, int32_t length) {
    RETURN_NOT_OK(CreateAndUpdate(&strings_, PythonType::STRING,
                                  [this]() { return new StringBuilder(pool_); }));
    return strings_->Append(data, length);
  }

  // Appending a half_float to the sequence
  Status AppendHalfFloat(const npy_half data) {
    return AppendPrimitive(&half_floats_, data, PythonType::HALF_FLOAT);
  }

  // Appending a float to the sequence
  Status AppendFloat(const float data) {
    return AppendPrimitive(&floats_, data, PythonType::FLOAT);
  }

  // Appending a double to the sequence
  Status AppendDouble(const double data) {
    return AppendPrimitive(&doubles_, data, PythonType::DOUBLE);
  }

  // Appending a Date64 timestamp to the sequence
  Status AppendDate64(const int64_t timestamp) {
    return AppendPrimitive(&date64s_, timestamp, PythonType::DATE64);
  }

  // Appending a tensor to the sequence
  //
  // \param tensor_index Index of the tensor in the object.
  Status AppendTensor(const int32_t tensor_index) {
    RETURN_NOT_OK(CreateAndUpdate(&tensor_indices_, PythonType::TENSOR,
                                  [this]() { return new Int32Builder(pool_); }));
    return tensor_indices_->Append(tensor_index);
  }

  // Appending a sparse coo tensor to the sequence
  //
  // \param sparse_coo_tensor_index Index of the sparse coo tensor in the object.
  Status AppendSparseCOOTensor(const int32_t sparse_coo_tensor_index) {
    RETURN_NOT_OK(CreateAndUpdate(&sparse_coo_tensor_indices_,
                                  PythonType::SPARSECOOTENSOR,
                                  [this]() { return new Int32Builder(pool_); }));
    return sparse_coo_tensor_indices_->Append(sparse_coo_tensor_index);
  }

  // Appending a sparse csr matrix to the sequence
  //
  // \param sparse_csr_matrix_index Index of the sparse csr matrix in the object.
  Status AppendSparseCSRMatrix(const int32_t sparse_csr_matrix_index) {
    RETURN_NOT_OK(CreateAndUpdate(&sparse_csr_matrix_indices_,
                                  PythonType::SPARSECSRMATRIX,
                                  [this]() { return new Int32Builder(pool_); }));
    return sparse_csr_matrix_indices_->Append(sparse_csr_matrix_index);
  }

  // Appending a sparse csc matrix to the sequence
  //
  // \param sparse_csc_matrix_index Index of the sparse csc matrix in the object.
  Status AppendSparseCSCMatrix(const int32_t sparse_csc_matrix_index) {
    RETURN_NOT_OK(CreateAndUpdate(&sparse_csc_matrix_indices_,
                                  PythonType::SPARSECSCMATRIX,
                                  [this]() { return new Int32Builder(pool_); }));
    return sparse_csc_matrix_indices_->Append(sparse_csc_matrix_index);
  }

  // Appending a sparse csf tensor to the sequence
  //
  // \param sparse_csf_tensor_index Index of the sparse csf tensor in the object.
  Status AppendSparseCSFTensor(const int32_t sparse_csf_tensor_index) {
    RETURN_NOT_OK(CreateAndUpdate(&sparse_csf_tensor_indices_,
                                  PythonType::SPARSECSFTENSOR,
                                  [this]() { return new Int32Builder(pool_); }));
    return sparse_csf_tensor_indices_->Append(sparse_csf_tensor_index);
  }

  // Appending a numpy ndarray to the sequence
  //
  // \param tensor_index Index of the tensor in the object.
  Status AppendNdarray(const int32_t ndarray_index) {
    RETURN_NOT_OK(CreateAndUpdate(&ndarray_indices_, PythonType::NDARRAY,
                                  [this]() { return new Int32Builder(pool_); }));
    return ndarray_indices_->Append(ndarray_index);
  }

  // Appending a buffer to the sequence
  //
  // \param buffer_index Index of the buffer in the object.
  Status AppendBuffer(const int32_t buffer_index) {
    RETURN_NOT_OK(CreateAndUpdate(&buffer_indices_, PythonType::BUFFER,
                                  [this]() { return new Int32Builder(pool_); }));
    return buffer_indices_->Append(buffer_index);
  }

  Status AppendSequence(PyObject* context, PyObject* sequence, int8_t tag,
                        std::shared_ptr<ListBuilder>& target_sequence,
                        std::unique_ptr<SequenceBuilder>& values, int32_t recursion_depth,
                        SerializedPyObject* blobs_out) {
    if (recursion_depth >= kMaxRecursionDepth) {
      return Status::NotImplemented(
          "This object exceeds the maximum recursion depth. It may contain itself "
          "recursively.");
    }
    RETURN_NOT_OK(CreateAndUpdate(&target_sequence, tag, [this, &values]() {
      values.reset(new SequenceBuilder(pool_));
      return new ListBuilder(pool_, values->builder());
    }));
    RETURN_NOT_OK(target_sequence->Append());
    return internal::VisitIterable(
        sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
          return Append(context, obj, values.get(), recursion_depth, blobs_out);
        });
  }

  Status AppendList(PyObject* context, PyObject* list, int32_t recursion_depth,
                    SerializedPyObject* blobs_out) {
    return AppendSequence(context, list, PythonType::LIST, lists_, list_values_,
                          recursion_depth + 1, blobs_out);
  }

  Status AppendTuple(PyObject* context, PyObject* tuple, int32_t recursion_depth,
                     SerializedPyObject* blobs_out) {
    return AppendSequence(context, tuple, PythonType::TUPLE, tuples_, tuple_values_,
                          recursion_depth + 1, blobs_out);
  }

  Status AppendSet(PyObject* context, PyObject* set, int32_t recursion_depth,
                   SerializedPyObject* blobs_out) {
    return AppendSequence(context, set, PythonType::SET, sets_, set_values_,
                          recursion_depth + 1, blobs_out);
  }

  Status AppendDict(PyObject* context, PyObject* dict, int32_t recursion_depth,
                    SerializedPyObject* blobs_out);

  // Finish building the sequence and return the result.
  // Input arrays may be nullptr
  Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }

  std::shared_ptr<DenseUnionBuilder> builder() { return builder_; }

 private:
  MemoryPool* pool_;

  Int8Builder types_;
  Int32Builder offsets_;

  /// Mapping from PythonType to child index
  std::vector<int8_t> type_map_;

  std::shared_ptr<BooleanBuilder> bools_;
  std::shared_ptr<Int64Builder> ints_;
  std::shared_ptr<BinaryBuilder> bytes_;
  std::shared_ptr<StringBuilder> strings_;
  std::shared_ptr<HalfFloatBuilder> half_floats_;
  std::shared_ptr<FloatBuilder> floats_;
  std::shared_ptr<DoubleBuilder> doubles_;
  std::shared_ptr<Date64Builder> date64s_;

  std::unique_ptr<SequenceBuilder> list_values_;
  std::shared_ptr<ListBuilder> lists_;
  std::unique_ptr<DictBuilder> dict_values_;
  std::shared_ptr<ListBuilder> dicts_;
  std::unique_ptr<SequenceBuilder> tuple_values_;
  std::shared_ptr<ListBuilder> tuples_;
  std::unique_ptr<SequenceBuilder> set_values_;
  std::shared_ptr<ListBuilder> sets_;

  std::shared_ptr<Int32Builder> tensor_indices_;
  std::shared_ptr<Int32Builder> sparse_coo_tensor_indices_;
  std::shared_ptr<Int32Builder> sparse_csr_matrix_indices_;
  std::shared_ptr<Int32Builder> sparse_csc_matrix_indices_;
  std::shared_ptr<Int32Builder> sparse_csf_tensor_indices_;
  std::shared_ptr<Int32Builder> ndarray_indices_;
  std::shared_ptr<Int32Builder> buffer_indices_;

  std::shared_ptr<DenseUnionBuilder> builder_;
};

// Constructing dictionaries of key/value pairs. Sequences of
// keys and values are built separately using a pair of
// SequenceBuilders. The resulting Arrow representation
// can be obtained via the Finish method.
class DictBuilder {
 public:
  explicit DictBuilder(MemoryPool* pool = nullptr) : keys_(pool), vals_(pool) {
    builder_.reset(new StructBuilder(struct_({field("keys", dense_union(FieldVector{})),
                                              field("vals", dense_union(FieldVector{}))}),
                                     pool, {keys_.builder(), vals_.builder()}));
  }

  // Builder for the keys of the dictionary
  SequenceBuilder& keys() { return keys_; }
  // Builder for the values of the dictionary
  SequenceBuilder& vals() { return vals_; }

  // Construct an Arrow StructArray representing the dictionary.
  // Contains a field "keys" for the keys and "vals" for the values.
  Status Finish(std::shared_ptr<Array>* out) { return builder_->Finish(out); }

  std::shared_ptr<StructBuilder> builder() { return builder_; }

 private:
  SequenceBuilder keys_;
  SequenceBuilder vals_;
  std::shared_ptr<StructBuilder> builder_;
};

Status SequenceBuilder::AppendDict(PyObject* context, PyObject* dict,
                                   int32_t recursion_depth,
                                   SerializedPyObject* blobs_out) {
  if (recursion_depth >= kMaxRecursionDepth) {
    return Status::NotImplemented(
        "This object exceeds the maximum recursion depth. It may contain itself "
        "recursively.");
  }
  RETURN_NOT_OK(CreateAndUpdate(&dicts_, PythonType::DICT, [this]() {
    dict_values_.reset(new DictBuilder(pool_));
    return new ListBuilder(pool_, dict_values_->builder());
  }));
  RETURN_NOT_OK(dicts_->Append());
  PyObject* key;
  PyObject* value;
  Py_ssize_t pos = 0;
  while (PyDict_Next(dict, &pos, &key, &value)) {
    RETURN_NOT_OK(dict_values_->builder()->Append());
    RETURN_NOT_OK(
        Append(context, key, &dict_values_->keys(), recursion_depth + 1, blobs_out));
    RETURN_NOT_OK(
        Append(context, value, &dict_values_->vals(), recursion_depth + 1, blobs_out));
  }

  // This block is used to decrement the reference counts of the results
  // returned by the serialization callback, which is called in AppendArray,
  // in DeserializeDict and in Append
  static PyObject* py_type = PyUnicode_FromString("_pytype_");
  if (PyDict_Contains(dict, py_type)) {
    // If the dictionary contains the key "_pytype_", then the user has to
    // have registered a callback.
    if (context == Py_None) {
      return Status::Invalid("No serialization callback set");
    }
    Py_XDECREF(dict);
  }
  return Status::OK();
}

Status CallCustomCallback(PyObject* context, PyObject* method_name, PyObject* elem,
                          PyObject** result) {
  if (context == Py_None) {
    *result = NULL;
    return Status::SerializationError("error while calling callback on ",
                                      internal::PyObject_StdStringRepr(elem),
                                      ": handler not registered");
  } else {
    *result = PyObject_CallMethodObjArgs(context, method_name, elem, NULL);
    return CheckPyError();
  }
}

Status CallSerializeCallback(PyObject* context, PyObject* value,
                             PyObject** serialized_object) {
  OwnedRef method_name(PyUnicode_FromString("_serialize_callback"));
  RETURN_NOT_OK(CallCustomCallback(context, method_name.obj(), value, serialized_object));
  if (!PyDict_Check(*serialized_object)) {
    return Status::TypeError("serialization callback must return a valid dictionary");
  }
  return Status::OK();
}

Status CallDeserializeCallback(PyObject* context, PyObject* value,
                               PyObject** deserialized_object) {
  OwnedRef method_name(PyUnicode_FromString("_deserialize_callback"));
  return CallCustomCallback(context, method_name.obj(), value, deserialized_object);
}

Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
                   int32_t recursion_depth, SerializedPyObject* blobs_out);

template <typename NumpyScalarObject>
Status AppendIntegerScalar(PyObject* obj, SequenceBuilder* builder) {
  int64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
  return builder->AppendInt64(value);
}

// Append a potentially 64-bit wide unsigned Numpy scalar.
// Must check for overflow as we reinterpret it as signed int64.
template <typename NumpyScalarObject>
Status AppendLargeUnsignedScalar(PyObject* obj, SequenceBuilder* builder) {
  constexpr uint64_t max_value = std::numeric_limits<int64_t>::max();

  uint64_t value = reinterpret_cast<NumpyScalarObject*>(obj)->obval;
  if (value > max_value) {
    return Status::Invalid("cannot serialize Numpy uint64 scalar >= 2**63");
  }
  return builder->AppendInt64(static_cast<int64_t>(value));
}

Status AppendScalar(PyObject* obj, SequenceBuilder* builder) {
  if (PyArray_IsScalar(obj, Bool)) {
    return builder->AppendBool(reinterpret_cast<PyBoolScalarObject*>(obj)->obval != 0);
  } else if (PyArray_IsScalar(obj, Half)) {
    return builder->AppendHalfFloat(reinterpret_cast<PyHalfScalarObject*>(obj)->obval);
  } else if (PyArray_IsScalar(obj, Float)) {
    return builder->AppendFloat(reinterpret_cast<PyFloatScalarObject*>(obj)->obval);
  } else if (PyArray_IsScalar(obj, Double)) {
    return builder->AppendDouble(reinterpret_cast<PyDoubleScalarObject*>(obj)->obval);
  }
  if (PyArray_IsScalar(obj, Byte)) {
    return AppendIntegerScalar<PyByteScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, Short)) {
    return AppendIntegerScalar<PyShortScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, Int)) {
    return AppendIntegerScalar<PyIntScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, Long)) {
    return AppendIntegerScalar<PyLongScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, LongLong)) {
    return AppendIntegerScalar<PyLongLongScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, Int64)) {
    return AppendIntegerScalar<PyInt64ScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, UByte)) {
    return AppendIntegerScalar<PyUByteScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, UShort)) {
    return AppendIntegerScalar<PyUShortScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, UInt)) {
    return AppendIntegerScalar<PyUIntScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, ULong)) {
    return AppendLargeUnsignedScalar<PyULongScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, ULongLong)) {
    return AppendLargeUnsignedScalar<PyULongLongScalarObject>(obj, builder);
  } else if (PyArray_IsScalar(obj, UInt64)) {
    return AppendLargeUnsignedScalar<PyUInt64ScalarObject>(obj, builder);
  }
  return Status::NotImplemented("Numpy scalar type not recognized");
}

Status Append(PyObject* context, PyObject* elem, SequenceBuilder* builder,
              int32_t recursion_depth, SerializedPyObject* blobs_out) {
  // The bool case must precede the int case (PyInt_Check passes for bools)
  if (PyBool_Check(elem)) {
    RETURN_NOT_OK(builder->AppendBool(elem == Py_True));
  } else if (PyArray_DescrFromScalar(elem)->type_num == NPY_HALF) {
    npy_half halffloat = reinterpret_cast<PyHalfScalarObject*>(elem)->obval;
    RETURN_NOT_OK(builder->AppendHalfFloat(halffloat));
  } else if (PyFloat_Check(elem)) {
    RETURN_NOT_OK(builder->AppendDouble(PyFloat_AS_DOUBLE(elem)));
  } else if (PyLong_Check(elem)) {
    int overflow = 0;
    int64_t data = PyLong_AsLongLongAndOverflow(elem, &overflow);
    if (!overflow) {
      RETURN_NOT_OK(builder->AppendInt64(data));
    } else {
      // Attempt to serialize the object using the custom callback.
      PyObject* serialized_object;
      // The reference count of serialized_object will be decremented in SerializeDict
      RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
      RETURN_NOT_OK(
          builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
    }
  } else if (PyBytes_Check(elem)) {
    auto data = reinterpret_cast<uint8_t*>(PyBytes_AS_STRING(elem));
    int32_t size = -1;
    RETURN_NOT_OK(internal::CastSize(PyBytes_GET_SIZE(elem), &size));
    RETURN_NOT_OK(builder->AppendBytes(data, size));
  } else if (PyUnicode_Check(elem)) {
    ARROW_ASSIGN_OR_RAISE(auto view, PyBytesView::FromUnicode(elem));
    int32_t size = -1;
    RETURN_NOT_OK(internal::CastSize(view.size, &size));
    RETURN_NOT_OK(builder->AppendString(view.bytes, size));
  } else if (PyList_CheckExact(elem)) {
    RETURN_NOT_OK(builder->AppendList(context, elem, recursion_depth, blobs_out));
  } else if (PyDict_CheckExact(elem)) {
    RETURN_NOT_OK(builder->AppendDict(context, elem, recursion_depth, blobs_out));
  } else if (PyTuple_CheckExact(elem)) {
    RETURN_NOT_OK(builder->AppendTuple(context, elem, recursion_depth, blobs_out));
  } else if (PySet_Check(elem)) {
    RETURN_NOT_OK(builder->AppendSet(context, elem, recursion_depth, blobs_out));
  } else if (PyArray_IsScalar(elem, Generic)) {
    RETURN_NOT_OK(AppendScalar(elem, builder));
  } else if (PyArray_CheckExact(elem)) {
    RETURN_NOT_OK(AppendArray(context, reinterpret_cast<PyArrayObject*>(elem), builder,
                              recursion_depth, blobs_out));
  } else if (elem == Py_None) {
    RETURN_NOT_OK(builder->AppendNone());
  } else if (PyDateTime_Check(elem)) {
    PyDateTime_DateTime* datetime = reinterpret_cast<PyDateTime_DateTime*>(elem);
    RETURN_NOT_OK(builder->AppendDate64(internal::PyDateTime_to_us(datetime)));
  } else if (is_buffer(elem)) {
    RETURN_NOT_OK(builder->AppendBuffer(static_cast<int32_t>(blobs_out->buffers.size())));
    ARROW_ASSIGN_OR_RAISE(auto buffer, unwrap_buffer(elem));
    blobs_out->buffers.push_back(buffer);
  } else if (is_tensor(elem)) {
    RETURN_NOT_OK(builder->AppendTensor(static_cast<int32_t>(blobs_out->tensors.size())));
    ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_tensor(elem));
    blobs_out->tensors.push_back(tensor);
  } else if (is_sparse_coo_tensor(elem)) {
    RETURN_NOT_OK(builder->AppendSparseCOOTensor(
        static_cast<int32_t>(blobs_out->sparse_tensors.size())));
    ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_coo_tensor(elem));
    blobs_out->sparse_tensors.push_back(tensor);
  } else if (is_sparse_csr_matrix(elem)) {
    RETURN_NOT_OK(builder->AppendSparseCSRMatrix(
        static_cast<int32_t>(blobs_out->sparse_tensors.size())));
    ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csr_matrix(elem));
    blobs_out->sparse_tensors.push_back(matrix);
  } else if (is_sparse_csc_matrix(elem)) {
    RETURN_NOT_OK(builder->AppendSparseCSCMatrix(
        static_cast<int32_t>(blobs_out->sparse_tensors.size())));
    ARROW_ASSIGN_OR_RAISE(auto matrix, unwrap_sparse_csc_matrix(elem));
    blobs_out->sparse_tensors.push_back(matrix);
  } else if (is_sparse_csf_tensor(elem)) {
    RETURN_NOT_OK(builder->AppendSparseCSFTensor(
        static_cast<int32_t>(blobs_out->sparse_tensors.size())));
    ARROW_ASSIGN_OR_RAISE(auto tensor, unwrap_sparse_csf_tensor(elem));
    blobs_out->sparse_tensors.push_back(tensor);
  } else {
    // Attempt to serialize the object using the custom callback.
    PyObject* serialized_object;
    // The reference count of serialized_object will be decremented in SerializeDict
    RETURN_NOT_OK(CallSerializeCallback(context, elem, &serialized_object));
    RETURN_NOT_OK(
        builder->AppendDict(context, serialized_object, recursion_depth, blobs_out));
  }
  return Status::OK();
}

Status AppendArray(PyObject* context, PyArrayObject* array, SequenceBuilder* builder,
                   int32_t recursion_depth, SerializedPyObject* blobs_out) {
  int dtype = PyArray_TYPE(array);
  switch (dtype) {
    case NPY_UINT8:
    case NPY_INT8:
    case NPY_UINT16:
    case NPY_INT16:
    case NPY_UINT32:
    case NPY_INT32:
    case NPY_UINT64:
    case NPY_INT64:
    case NPY_HALF:
    case NPY_FLOAT:
    case NPY_DOUBLE: {
      RETURN_NOT_OK(
          builder->AppendNdarray(static_cast<int32_t>(blobs_out->ndarrays.size())));
      std::shared_ptr<Tensor> tensor;
      RETURN_NOT_OK(NdarrayToTensor(default_memory_pool(),
                                    reinterpret_cast<PyObject*>(array), {}, &tensor));
      blobs_out->ndarrays.push_back(tensor);
    } break;
    default: {
      PyObject* serialized_object;
      // The reference count of serialized_object will be decremented in SerializeDict
      RETURN_NOT_OK(CallSerializeCallback(context, reinterpret_cast<PyObject*>(array),
                                          &serialized_object));
      RETURN_NOT_OK(builder->AppendDict(context, serialized_object, recursion_depth + 1,
                                        blobs_out));
    }
  }
  return Status::OK();
}

std::shared_ptr<RecordBatch> MakeBatch(std::shared_ptr<Array> data) {
  auto field = std::make_shared<Field>("list", data->type());
  auto schema = ::arrow::schema({field});
  return RecordBatch::Make(schema, data->length(), {data});
}

Status SerializeObject(PyObject* context, PyObject* sequence, SerializedPyObject* out) {
  PyAcquireGIL lock;
  SequenceBuilder builder;
  RETURN_NOT_OK(internal::VisitIterable(
      sequence, [&](PyObject* obj, bool* keep_going /* unused */) {
        return Append(context, obj, &builder, 0, out);
      }));
  std::shared_ptr<Array> array;
  RETURN_NOT_OK(builder.Finish(&array));
  out->batch = MakeBatch(array);
  return Status::OK();
}

Status SerializeNdarray(std::shared_ptr<Tensor> tensor, SerializedPyObject* out) {
  std::shared_ptr<Array> array;
  SequenceBuilder builder;
  RETURN_NOT_OK(builder.AppendNdarray(static_cast<int32_t>(out->ndarrays.size())));
  out->ndarrays.push_back(tensor);
  RETURN_NOT_OK(builder.Finish(&array));
  out->batch = MakeBatch(array);
  return Status::OK();
}

Status WriteNdarrayHeader(std::shared_ptr<DataType> dtype,
                          const std::vector<int64_t>& shape, int64_t tensor_num_bytes,
                          io::OutputStream* dst) {
  auto empty_tensor = std::make_shared<Tensor>(
      dtype, std::make_shared<Buffer>(nullptr, tensor_num_bytes), shape);
  SerializedPyObject serialized_tensor;
  RETURN_NOT_OK(SerializeNdarray(empty_tensor, &serialized_tensor));
  return serialized_tensor.WriteTo(dst);
}

SerializedPyObject::SerializedPyObject()
    : ipc_options(ipc::IpcWriteOptions::Defaults()) {}

Status SerializedPyObject::WriteTo(io::OutputStream* dst) {
  int32_t num_tensors = static_cast<int32_t>(this->tensors.size());
  int32_t num_sparse_tensors = static_cast<int32_t>(this->sparse_tensors.size());
  int32_t num_ndarrays = static_cast<int32_t>(this->ndarrays.size());
  int32_t num_buffers = static_cast<int32_t>(this->buffers.size());
  RETURN_NOT_OK(
      dst->Write(reinterpret_cast<const uint8_t*>(&num_tensors), sizeof(int32_t)));
  RETURN_NOT_OK(
      dst->Write(reinterpret_cast<const uint8_t*>(&num_sparse_tensors), sizeof(int32_t)));
  RETURN_NOT_OK(
      dst->Write(reinterpret_cast<const uint8_t*>(&num_ndarrays), sizeof(int32_t)));
  RETURN_NOT_OK(
      dst->Write(reinterpret_cast<const uint8_t*>(&num_buffers), sizeof(int32_t)));

  // Align stream to 8-byte offset
  RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kArrowIpcAlignment));
  RETURN_NOT_OK(ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, dst));

  // Align stream to 64-byte offset so tensor bodies are 64-byte aligned
  RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));

  int32_t metadata_length;
  int64_t body_length;
  for (const auto& tensor : this->tensors) {
    RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
    RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
  }

  for (const auto& sparse_tensor : this->sparse_tensors) {
    RETURN_NOT_OK(
        ipc::WriteSparseTensor(*sparse_tensor, dst, &metadata_length, &body_length));
    RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
  }

  for (const auto& tensor : this->ndarrays) {
    RETURN_NOT_OK(ipc::WriteTensor(*tensor, dst, &metadata_length, &body_length));
    RETURN_NOT_OK(ipc::AlignStream(dst, ipc::kTensorAlignment));
  }

  for (const auto& buffer : this->buffers) {
    int64_t size = buffer->size();
    RETURN_NOT_OK(dst->Write(reinterpret_cast<const uint8_t*>(&size), sizeof(int64_t)));
    RETURN_NOT_OK(dst->Write(buffer->data(), size));
  }

  return Status::OK();
}

namespace {

Status CountSparseTensors(
    const std::vector<std::shared_ptr<SparseTensor>>& sparse_tensors, PyObject** out) {
  OwnedRef num_sparse_tensors(PyDict_New());
  size_t num_coo = 0;
  size_t num_csr = 0;
  size_t num_csc = 0;
  size_t num_csf = 0;
  size_t ndim_csf = 0;

  for (const auto& sparse_tensor : sparse_tensors) {
    switch (sparse_tensor->format_id()) {
      case SparseTensorFormat::COO:
        ++num_coo;
        break;
      case SparseTensorFormat::CSR:
        ++num_csr;
        break;
      case SparseTensorFormat::CSC:
        ++num_csc;
        break;
      case SparseTensorFormat::CSF:
        ++num_csf;
        ndim_csf += sparse_tensor->ndim();
        break;
    }
  }

  PyDict_SetItemString(num_sparse_tensors.obj(), "coo", PyLong_FromSize_t(num_coo));
  PyDict_SetItemString(num_sparse_tensors.obj(), "csr", PyLong_FromSize_t(num_csr));
  PyDict_SetItemString(num_sparse_tensors.obj(), "csc", PyLong_FromSize_t(num_csc));
  PyDict_SetItemString(num_sparse_tensors.obj(), "csf", PyLong_FromSize_t(num_csf));
  PyDict_SetItemString(num_sparse_tensors.obj(), "ndim_csf", PyLong_FromSize_t(ndim_csf));
  RETURN_IF_PYERROR();

  *out = num_sparse_tensors.detach();
  return Status::OK();
}

}  // namespace

Status SerializedPyObject::GetComponents(MemoryPool* memory_pool, PyObject** out) {
  PyAcquireGIL py_gil;

  OwnedRef result(PyDict_New());
  PyObject* buffers = PyList_New(0);
  PyObject* num_sparse_tensors = nullptr;

  // TODO(wesm): Not sure how pedantic we need to be about checking the return
  // values of these functions. There are other places where we do not check
  // PyDict_SetItem/SetItemString return value, but these failures would be
  // quite esoteric
  PyDict_SetItemString(result.obj(), "num_tensors",
                       PyLong_FromSize_t(this->tensors.size()));
  RETURN_NOT_OK(CountSparseTensors(this->sparse_tensors, &num_sparse_tensors));
  PyDict_SetItemString(result.obj(), "num_sparse_tensors", num_sparse_tensors);
  PyDict_SetItemString(result.obj(), "ndim_csf", num_sparse_tensors);
  PyDict_SetItemString(result.obj(), "num_ndarrays",
                       PyLong_FromSize_t(this->ndarrays.size()));
  PyDict_SetItemString(result.obj(), "num_buffers",
                       PyLong_FromSize_t(this->buffers.size()));
  PyDict_SetItemString(result.obj(), "data", buffers);
  RETURN_IF_PYERROR();

  Py_DECREF(buffers);

  auto PushBuffer = [&buffers](const std::shared_ptr<Buffer>& buffer) {
    PyObject* wrapped_buffer = wrap_buffer(buffer);
    RETURN_IF_PYERROR();
    if (PyList_Append(buffers, wrapped_buffer) < 0) {
      Py_DECREF(wrapped_buffer);
      RETURN_IF_PYERROR();
    }
    Py_DECREF(wrapped_buffer);
    return Status::OK();
  };

  constexpr int64_t kInitialCapacity = 1024;

  // Write the record batch describing the object structure
  py_gil.release();
  ARROW_ASSIGN_OR_RAISE(auto stream,
                        io::BufferOutputStream::Create(kInitialCapacity, memory_pool));
  RETURN_NOT_OK(
      ipc::WriteRecordBatchStream({this->batch}, this->ipc_options, stream.get()));
  ARROW_ASSIGN_OR_RAISE(auto buffer, stream->Finish());
  py_gil.acquire();

  RETURN_NOT_OK(PushBuffer(buffer));

  // For each tensor, get a metadata buffer and a buffer for the body
  for (const auto& tensor : this->tensors) {
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
                          ipc::GetTensorMessage(*tensor, memory_pool));
    RETURN_NOT_OK(PushBuffer(message->metadata()));
    RETURN_NOT_OK(PushBuffer(message->body()));
  }

  // For each sparse tensor, get a metadata buffer and buffers containing index and data
  for (const auto& sparse_tensor : this->sparse_tensors) {
    ipc::IpcPayload payload;
    RETURN_NOT_OK(ipc::GetSparseTensorPayload(*sparse_tensor, memory_pool, &payload));
    RETURN_NOT_OK(PushBuffer(payload.metadata));
    for (const auto& body : payload.body_buffers) {
      RETURN_NOT_OK(PushBuffer(body));
    }
  }

  // For each ndarray, get a metadata buffer and a buffer for the body
  for (const auto& ndarray : this->ndarrays) {
    ARROW_ASSIGN_OR_RAISE(std::unique_ptr<ipc::Message> message,
                          ipc::GetTensorMessage(*ndarray, memory_pool));
    RETURN_NOT_OK(PushBuffer(message->metadata()));
    RETURN_NOT_OK(PushBuffer(message->body()));
  }

  for (const auto& buf : this->buffers) {
    RETURN_NOT_OK(PushBuffer(buf));
  }

  *out = result.detach();
  return Status::OK();
}

}  // namespace py
}  // namespace arrow