Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev246 

/ src / arrow / python / arrow_to_pandas.cc

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// Functions for pandas conversion via NumPy

#include "arrow/python/arrow_to_pandas.h"
#include "arrow/python/numpy_interop.h"  // IWYU pragma: expand

#include <cmath>
#include <cstdint>
#include <iostream>
#include <memory>
#include <mutex>
#include <string>
#include <string_view>
#include <unordered_map>
#include <utility>
#include <vector>

#include "arrow/array.h"
#include "arrow/buffer.h"
#include "arrow/datum.h"
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_traits.h"
#include "arrow/util/checked_cast.h"
#include "arrow/util/hashing.h"
#include "arrow/util/int_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/macros.h"
#include "arrow/util/parallel.h"
#include "arrow/visit_type_inline.h"

#include "arrow/compute/api.h"

#include "arrow/python/arrow_to_python_internal.h"
#include "arrow/python/common.h"
#include "arrow/python/datetime.h"
#include "arrow/python/decimal.h"
#include "arrow/python/helpers.h"
#include "arrow/python/numpy_convert.h"
#include "arrow/python/numpy_internal.h"
#include "arrow/python/pyarrow.h"
#include "arrow/python/python_to_arrow.h"
#include "arrow/python/type_traits.h"

namespace arrow {

class MemoryPool;

using internal::checked_cast;
using internal::CheckIndexBounds;
using internal::OptionalParallelFor;

namespace py {
namespace {

// Fix options for conversion of an inner (child) array.
PandasOptions MakeInnerOptions(PandasOptions options) {
  // Make sure conversion of inner dictionary arrays always returns an array,
  // not a dict {'indices': array, 'dictionary': array, 'ordered': bool}
  options.decode_dictionaries = true;
  options.categorical_columns.clear();
  options.strings_to_categorical = false;

  // In ARROW-7723, we found as a result of ARROW-3789 that second
  // through microsecond resolution tz-aware timestamps were being promoted to
  // use the DATETIME_NANO_TZ conversion path, yielding a datetime64[ns] NumPy
  // array in this function. PyArray_GETITEM returns datetime.datetime for
  // units second through microsecond but PyLong for nanosecond (because
  // datetime.datetime does not support nanoseconds).
  // We force the object conversion to preserve the value of the timezone.
  // Nanoseconds are returned as integers.
  options.coerce_temporal_nanoseconds = false;

  return options;
}

// ----------------------------------------------------------------------
// PyCapsule code for setting ndarray base to reference C++ object

struct ArrayCapsule {
  std::shared_ptr<Array> array;
};

struct BufferCapsule {
  std::shared_ptr<Buffer> buffer;
};

void ArrayCapsule_Destructor(PyObject* capsule) {
  delete reinterpret_cast<ArrayCapsule*>(PyCapsule_GetPointer(capsule, "arrow::Array"));
}

void BufferCapsule_Destructor(PyObject* capsule) {
  delete reinterpret_cast<BufferCapsule*>(PyCapsule_GetPointer(capsule, "arrow::Buffer"));
}

// ----------------------------------------------------------------------
// pandas 0.x DataFrame conversion internals

using internal::arrow_traits;
using internal::npy_traits;

template <typename T>
struct WrapBytes {};

template <>
struct WrapBytes<StringType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyUnicode_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<LargeStringType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyUnicode_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<StringViewType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyUnicode_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<BinaryType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyBytes_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<LargeBinaryType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyBytes_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<BinaryViewType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyBytes_FromStringAndSize(data, length);
  }
};

template <>
struct WrapBytes<FixedSizeBinaryType> {
  static inline PyObject* Wrap(const char* data, int64_t length) {
    return PyBytes_FromStringAndSize(data, length);
  }
};

static inline bool ListTypeSupported(const DataType& type) {
  switch (type.id()) {
    case Type::BOOL:
    case Type::UINT8:
    case Type::INT8:
    case Type::UINT16:
    case Type::INT16:
    case Type::UINT32:
    case Type::INT32:
    case Type::INT64:
    case Type::UINT64:
    case Type::HALF_FLOAT:
    case Type::FLOAT:
    case Type::DOUBLE:
    case Type::DECIMAL128:
    case Type::DECIMAL256:
    case Type::BINARY:
    case Type::LARGE_BINARY:
    case Type::STRING:
    case Type::LARGE_STRING:
    case Type::DATE32:
    case Type::DATE64:
    case Type::STRUCT:
    case Type::MAP:
    case Type::TIME32:
    case Type::TIME64:
    case Type::TIMESTAMP:
    case Type::DURATION:
    case Type::DICTIONARY:
    case Type::INTERVAL_MONTH_DAY_NANO:
    case Type::NA:  // empty list
      // The above types are all supported.
      return true;
    case Type::FIXED_SIZE_LIST:
    case Type::LIST:
    case Type::LARGE_LIST:
    case Type::LIST_VIEW:
    case Type::LARGE_LIST_VIEW: {
      const auto& list_type = checked_cast<const BaseListType&>(type);
      return ListTypeSupported(*list_type.value_type());
    }
    case Type::EXTENSION: {
      const auto& ext = checked_cast<const ExtensionType&>(*type.GetSharedPtr());
      return ListTypeSupported(*(ext.storage_type()));
    }
    default:
      break;
  }
  return false;
}

Status CapsulizeArray(const std::shared_ptr<Array>& arr, PyObject** out) {
  auto capsule = new ArrayCapsule{{arr}};
  *out = PyCapsule_New(reinterpret_cast<void*>(capsule), "arrow::Array",
                       &ArrayCapsule_Destructor);
  if (*out == nullptr) {
    delete capsule;
    RETURN_IF_PYERROR();
  }
  return Status::OK();
}

Status CapsulizeBuffer(const std::shared_ptr<Buffer>& buffer, PyObject** out) {
  auto capsule = new BufferCapsule{{buffer}};
  *out = PyCapsule_New(reinterpret_cast<void*>(capsule), "arrow::Buffer",
                       &BufferCapsule_Destructor);
  if (*out == nullptr) {
    delete capsule;
    RETURN_IF_PYERROR();
  }
  return Status::OK();
}

Status SetNdarrayBase(PyArrayObject* arr, PyObject* base) {
  if (PyArray_SetBaseObject(arr, base) == -1) {
    // Error occurred, trust that SetBaseObject sets the error state
    Py_XDECREF(base);
    RETURN_IF_PYERROR();
  }
  return Status::OK();
}

Status SetBufferBase(PyArrayObject* arr, const std::shared_ptr<Buffer>& buffer) {
  PyObject* base;
  RETURN_NOT_OK(CapsulizeBuffer(buffer, &base));
  return SetNdarrayBase(arr, base);
}

inline void set_numpy_metadata(int type, const DataType* datatype, PyArray_Descr* out) {
  auto metadata =
      reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(PyDataType_C_METADATA(out));
  if (type == NPY_DATETIME) {
    if (datatype->id() == Type::TIMESTAMP) {
      const auto& timestamp_type = checked_cast<const TimestampType&>(*datatype);
      metadata->meta.base = internal::NumPyFrequency(timestamp_type.unit());
    } else {
      DCHECK(false) << "NPY_DATETIME views only supported for Arrow TIMESTAMP types";
    }
  } else if (type == NPY_TIMEDELTA) {
    DCHECK_EQ(datatype->id(), Type::DURATION);
    const auto& duration_type = checked_cast<const DurationType&>(*datatype);
    metadata->meta.base = internal::NumPyFrequency(duration_type.unit());
  }
}

Status PyArray_NewFromPool(int nd, npy_intp* dims, PyArray_Descr* descr, MemoryPool* pool,
                           PyObject** out) {
  // ARROW-6570: Allocate memory from MemoryPool for a couple reasons
  //
  // * Track allocations
  // * Get better performance through custom allocators
  int64_t total_size = PyDataType_ELSIZE(descr);
  for (int i = 0; i < nd; ++i) {
    total_size *= dims[i];
  }

  ARROW_ASSIGN_OR_RAISE(auto buffer, AllocateBuffer(total_size, pool));
  *out = PyArray_NewFromDescr(&PyArray_Type, descr, nd, dims,
                              /*strides=*/nullptr,
                              /*data=*/buffer->mutable_data(),
                              /*flags=*/NPY_ARRAY_CARRAY | NPY_ARRAY_WRITEABLE,
                              /*obj=*/nullptr);
  if (*out == nullptr) {
    RETURN_IF_PYERROR();
    // Trust that error set if NULL returned
  }
  return SetBufferBase(reinterpret_cast<PyArrayObject*>(*out), std::move(buffer));
}

template <typename T = void>
inline const T* GetPrimitiveValues(const Array& arr) {
  if (arr.length() == 0) {
    return nullptr;
  }
  const int elsize = arr.type()->byte_width();
  const auto& prim_arr = checked_cast<const PrimitiveArray&>(arr);
  return reinterpret_cast<const T*>(prim_arr.values()->data() + arr.offset() * elsize);
}

Status MakeNumPyView(std::shared_ptr<Array> arr, PyObject* py_ref, int npy_type, int ndim,
                     npy_intp* dims, PyObject** out) {
  PyAcquireGIL lock;

  PyArray_Descr* descr = internal::GetSafeNumPyDtype(npy_type);
  set_numpy_metadata(npy_type, arr->type().get(), descr);
  PyObject* result = PyArray_NewFromDescr(
      &PyArray_Type, descr, ndim, dims, /*strides=*/nullptr,
      const_cast<void*>(GetPrimitiveValues(*arr)), /*flags=*/0, nullptr);
  PyArrayObject* np_arr = reinterpret_cast<PyArrayObject*>(result);
  if (np_arr == nullptr) {
    // Error occurred, trust that error set
    return Status::OK();
  }

  PyObject* base;
  if (py_ref == nullptr) {
    // Capsule will be owned by the ndarray, no incref necessary. See
    // ARROW-1973
    RETURN_NOT_OK(CapsulizeArray(arr, &base));
  } else {
    Py_INCREF(py_ref);
    base = py_ref;
  }
  RETURN_NOT_OK(SetNdarrayBase(np_arr, base));

  // Do not allow Arrow data to be mutated
  PyArray_CLEARFLAGS(np_arr, NPY_ARRAY_WRITEABLE);
  *out = result;
  return Status::OK();
}

class PandasWriter {
 public:
  enum type {
    OBJECT,
    UINT8,
    INT8,
    UINT16,
    INT16,
    UINT32,
    INT32,
    UINT64,
    INT64,
    HALF_FLOAT,
    FLOAT,
    DOUBLE,
    BOOL,
    DATETIME_DAY,
    DATETIME_SECOND,
    DATETIME_MILLI,
    DATETIME_MICRO,
    DATETIME_NANO,
    DATETIME_SECOND_TZ,
    DATETIME_MILLI_TZ,
    DATETIME_MICRO_TZ,
    DATETIME_NANO_TZ,
    TIMEDELTA_SECOND,
    TIMEDELTA_MILLI,
    TIMEDELTA_MICRO,
    TIMEDELTA_NANO,
    CATEGORICAL,
    EXTENSION
  };

  PandasWriter(const PandasOptions& options, int64_t num_rows, int num_columns)
      : options_(options), num_rows_(num_rows), num_columns_(num_columns) {
    PyAcquireGIL lock;
    internal::InitPandasStaticData();
  }
  virtual ~PandasWriter() {}

  void SetBlockData(PyObject* arr) {
    block_arr_.reset(arr);
    block_data_ =
        reinterpret_cast<uint8_t*>(PyArray_DATA(reinterpret_cast<PyArrayObject*>(arr)));
  }

  /// \brief Either copy or wrap single array to create pandas-compatible array
  /// for Series or DataFrame. num_columns_ can only be 1. Will try to zero
  /// copy if possible (or error if not possible and zero_copy_only=True)
  virtual Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) = 0;

  /// \brief Copy ChunkedArray into a multi-column block
  virtual Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) = 0;

  Status EnsurePlacementAllocated() {
    std::lock_guard<std::mutex> guard(allocation_lock_);
    if (placement_data_ != nullptr) {
      return Status::OK();
    }
    PyAcquireGIL lock;
    npy_intp placement_dims[1] = {num_columns_};
    PyObject* placement_arr = PyArray_SimpleNew(1, placement_dims, NPY_INT64);
    RETURN_IF_PYERROR();
    placement_arr_.reset(placement_arr);
    placement_data_ = reinterpret_cast<int64_t*>(
        PyArray_DATA(reinterpret_cast<PyArrayObject*>(placement_arr)));
    return Status::OK();
  }

  Status EnsureAllocated() {
    std::lock_guard<std::mutex> guard(allocation_lock_);
    if (block_data_ != nullptr) {
      return Status::OK();
    }
    RETURN_NOT_OK(Allocate());
    return Status::OK();
  }

  virtual bool CanZeroCopy(const ChunkedArray& data) const { return false; }

  virtual Status Write(std::shared_ptr<ChunkedArray> data, int64_t abs_placement,
                       int64_t rel_placement) {
    RETURN_NOT_OK(EnsurePlacementAllocated());
    if (num_columns_ == 1 && options_.allow_zero_copy_blocks) {
      RETURN_NOT_OK(TransferSingle(data, /*py_ref=*/nullptr));
    } else {
      RETURN_NOT_OK(
          CheckNoZeroCopy("Cannot do zero copy conversion into "
                          "multi-column DataFrame block"));
      RETURN_NOT_OK(EnsureAllocated());
      RETURN_NOT_OK(CopyInto(data, rel_placement));
    }
    placement_data_[rel_placement] = abs_placement;
    return Status::OK();
  }

  virtual Status GetDataFrameResult(PyObject** out) {
    PyObject* result = PyDict_New();
    RETURN_IF_PYERROR();

    PyObject* block;
    RETURN_NOT_OK(GetResultBlock(&block));

    PyDict_SetItemString(result, "block", block);
    PyDict_SetItemString(result, "placement", placement_arr_.obj());

    RETURN_NOT_OK(AddResultMetadata(result));
    *out = result;
    return Status::OK();
  }

  // Caller steals the reference to this object
  virtual Status GetSeriesResult(PyObject** out) {
    RETURN_NOT_OK(MakeBlock1D());
    // Caller owns the object now
    *out = block_arr_.detach();
    return Status::OK();
  }

 protected:
  virtual Status AddResultMetadata(PyObject* result) { return Status::OK(); }

  Status MakeBlock1D() {
    // For Series or for certain DataFrame block types, we need to shape to a
    // 1D array when there is only one column
    PyAcquireGIL lock;

    DCHECK_EQ(1, num_columns_);

    npy_intp new_dims[1] = {static_cast<npy_intp>(num_rows_)};
    PyArray_Dims dims;
    dims.ptr = new_dims;
    dims.len = 1;

    PyObject* reshaped = PyArray_Newshape(
        reinterpret_cast<PyArrayObject*>(block_arr_.obj()), &dims, NPY_ANYORDER);
    RETURN_IF_PYERROR();

    // ARROW-8801: Here a PyArrayObject is created that is not being managed by
    // any OwnedRef object. This object is then put in the resulting object
    // with PyDict_SetItemString, which increments the reference count, so a
    // memory leak ensues. There are several ways to fix the memory leak but a
    // simple one is to put the reshaped 1D block array in this OwnedRefNoGIL
    // so it will be correctly decref'd when this class is destructed.
    block_arr_.reset(reshaped);
    return Status::OK();
  }

  virtual Status GetResultBlock(PyObject** out) {
    *out = block_arr_.obj();
    return Status::OK();
  }

  Status CheckNoZeroCopy(const std::string& message) {
    if (options_.zero_copy_only) {
      return Status::Invalid(message);
    }
    return Status::OK();
  }

  Status CheckNotZeroCopyOnly(const ChunkedArray& data) {
    if (options_.zero_copy_only) {
      return Status::Invalid("Needed to copy ", data.num_chunks(), " chunks with ",
                             data.null_count(), " nulls, but zero_copy_only was True");
    }
    return Status::OK();
  }

  virtual Status Allocate() {
    return Status::NotImplemented("Override Allocate in subclasses");
  }

  Status AllocateNDArray(int npy_type, int ndim = 2) {
    PyAcquireGIL lock;

    PyObject* block_arr = nullptr;
    npy_intp block_dims[2] = {0, 0};

    if (ndim == 2) {
      block_dims[0] = num_columns_;
      block_dims[1] = num_rows_;
    } else {
      block_dims[0] = num_rows_;
    }
    PyArray_Descr* descr = internal::GetSafeNumPyDtype(npy_type);
    if (PyDataType_REFCHK(descr)) {
      // ARROW-6876: if the array has refcounted items, let Numpy
      // own the array memory so as to decref elements on array destruction
      block_arr = PyArray_SimpleNewFromDescr(ndim, block_dims, descr);
      RETURN_IF_PYERROR();
    } else {
      RETURN_NOT_OK(
          PyArray_NewFromPool(ndim, block_dims, descr, options_.pool, &block_arr));
    }

    SetBlockData(block_arr);
    return Status::OK();
  }

  void SetDatetimeUnit(NPY_DATETIMEUNIT unit) {
    PyAcquireGIL lock;
    auto date_dtype =
        reinterpret_cast<PyArray_DatetimeDTypeMetaData*>(PyDataType_C_METADATA(
            PyArray_DESCR(reinterpret_cast<PyArrayObject*>(block_arr_.obj()))));
    date_dtype->meta.base = unit;
  }

  PandasOptions options_;

  std::mutex allocation_lock_;

  int64_t num_rows_;
  int num_columns_;

  OwnedRefNoGIL block_arr_;
  uint8_t* block_data_ = nullptr;

  // ndarray<int32>
  OwnedRefNoGIL placement_arr_;
  int64_t* placement_data_ = nullptr;

 private:
  ARROW_DISALLOW_COPY_AND_ASSIGN(PandasWriter);
};

template <typename InType, typename OutType>
inline void ConvertIntegerWithNulls(const PandasOptions& options,
                                    const ChunkedArray& data, OutType* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const InType* in_values = GetPrimitiveValues<InType>(arr);
    // Upcast to double, set NaN as appropriate

    for (int i = 0; i < arr.length(); ++i) {
      *out_values++ =
          arr.IsNull(i) ? static_cast<OutType>(NAN) : static_cast<OutType>(in_values[i]);
    }
  }
}

template <typename T>
inline void ConvertIntegerNoNullsSameType(const PandasOptions& options,
                                          const ChunkedArray& data, T* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    if (arr.length() > 0) {
      const T* in_values = GetPrimitiveValues<T>(arr);
      memcpy(out_values, in_values, sizeof(T) * arr.length());
      out_values += arr.length();
    }
  }
}

template <typename InType, typename OutType>
inline void ConvertIntegerNoNullsCast(const PandasOptions& options,
                                      const ChunkedArray& data, OutType* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const InType* in_values = GetPrimitiveValues<InType>(arr);
    for (int64_t i = 0; i < arr.length(); ++i) {
      *out_values = in_values[i];
    }
  }
}

template <typename T, typename Enable = void>
struct MemoizationTraits {
  using Scalar = typename T::c_type;
};

template <typename T>
struct MemoizationTraits<T, enable_if_has_string_view<T>> {
  // For binary, we memoize string_view as a scalar value to avoid having to
  // unnecessarily copy the memory into the memo table data structure
  using Scalar = std::string_view;
};

// Generic Array -> PyObject** converter that handles object deduplication, if
// requested
template <typename Type, typename WrapFunction>
inline Status ConvertAsPyObjects(const PandasOptions& options, const ChunkedArray& data,
                                 WrapFunction&& wrap_func, PyObject** out_values) {
  using ArrayType = typename TypeTraits<Type>::ArrayType;
  using Scalar = typename MemoizationTraits<Type>::Scalar;

  auto convert_chunks = [&](auto&& wrap_func) -> Status {
    for (int c = 0; c < data.num_chunks(); c++) {
      const auto& arr = arrow::internal::checked_cast<const ArrayType&>(*data.chunk(c));
      RETURN_NOT_OK(internal::WriteArrayObjects(arr, wrap_func, out_values));
      out_values += arr.length();
    }
    return Status::OK();
  };

  if (options.deduplicate_objects) {
    // GH-40316: only allocate a memo table if deduplication is enabled.
    ::arrow::internal::ScalarMemoTable<Scalar> memo_table(options.pool);
    std::vector<PyObject*> unique_values;
    int32_t memo_size = 0;

    auto WrapMemoized = [&](const Scalar& value, PyObject** out_values) {
      int32_t memo_index;
      RETURN_NOT_OK(memo_table.GetOrInsert(value, &memo_index));
      if (memo_index == memo_size) {
        // New entry
        RETURN_NOT_OK(wrap_func(value, out_values));
        unique_values.push_back(*out_values);
        ++memo_size;
      } else {
        // Duplicate entry
        Py_INCREF(unique_values[memo_index]);
        *out_values = unique_values[memo_index];
      }
      return Status::OK();
    };
    return convert_chunks(std::move(WrapMemoized));
  } else {
    return convert_chunks(std::forward<WrapFunction>(wrap_func));
  }
}

Status ConvertStruct(PandasOptions options, const ChunkedArray& data,
                     PyObject** out_values) {
  if (data.num_chunks() == 0) {
    return Status::OK();
  }
  // ChunkedArray has at least one chunk
  auto arr = checked_cast<const StructArray*>(data.chunk(0).get());
  // Use it to cache the struct type and number of fields for all chunks
  int32_t num_fields = arr->num_fields();
  auto array_type = arr->type();
  std::vector<OwnedRef> fields_data(num_fields * data.num_chunks());
  OwnedRef dict_item;

  // See notes in MakeInnerOptions.
  options = MakeInnerOptions(std::move(options));
  // Don't blindly convert because timestamps in lists are handled differently.
  options.timestamp_as_object = true;

  for (int c = 0; c < data.num_chunks(); c++) {
    auto fields_data_offset = c * num_fields;
    auto arr = checked_cast<const StructArray*>(data.chunk(c).get());
    // Convert the struct arrays first
    for (int32_t i = 0; i < num_fields; i++) {
      auto field = arr->field(static_cast<int>(i));
      // In case the field is an extension array, use .storage() to convert to Pandas
      if (field->type()->id() == Type::EXTENSION) {
        const ExtensionArray& arr_ext = checked_cast<const ExtensionArray&>(*field);
        field = arr_ext.storage();
      }
      RETURN_NOT_OK(ConvertArrayToPandas(options, field, nullptr,
                                         fields_data[i + fields_data_offset].ref()));
      DCHECK(PyArray_Check(fields_data[i + fields_data_offset].obj()));
    }

    // Construct a dictionary for each row
    const bool has_nulls = data.null_count() > 0;
    for (int64_t i = 0; i < arr->length(); ++i) {
      if (has_nulls && arr->IsNull(i)) {
        Py_INCREF(Py_None);
        *out_values = Py_None;
      } else {
        // Build the new dict object for the row
        dict_item.reset(PyDict_New());
        RETURN_IF_PYERROR();
        for (int32_t field_idx = 0; field_idx < num_fields; ++field_idx) {
          OwnedRef field_value;
          auto name = array_type->field(static_cast<int>(field_idx))->name();
          if (!arr->field(static_cast<int>(field_idx))->IsNull(i)) {
            // Value exists in child array, obtain it
            auto array = reinterpret_cast<PyArrayObject*>(
                fields_data[field_idx + fields_data_offset].obj());
            auto ptr = reinterpret_cast<const char*>(PyArray_GETPTR1(array, i));
            field_value.reset(PyArray_GETITEM(array, ptr));
            RETURN_IF_PYERROR();
          } else {
            // Translate the Null to a None
            Py_INCREF(Py_None);
            field_value.reset(Py_None);
          }
          // PyDict_SetItemString increments reference count
          auto setitem_result =
              PyDict_SetItemString(dict_item.obj(), name.c_str(), field_value.obj());
          RETURN_IF_PYERROR();
          DCHECK_EQ(setitem_result, 0);
        }
        *out_values = dict_item.obj();
        // Grant ownership to the resulting array
        Py_INCREF(*out_values);
      }
      ++out_values;
    }
  }
  return Status::OK();
}

Status DecodeDictionaries(MemoryPool* pool, const std::shared_ptr<DataType>& dense_type,
                          ArrayVector* arrays) {
  compute::ExecContext ctx(pool);
  compute::CastOptions options;
  for (size_t i = 0; i < arrays->size(); ++i) {
    ARROW_ASSIGN_OR_RAISE((*arrays)[i],
                          compute::Cast(*(*arrays)[i], dense_type, options, &ctx));
  }
  return Status::OK();
}

Status DecodeDictionaries(MemoryPool* pool, const std::shared_ptr<DataType>& dense_type,
                          std::shared_ptr<ChunkedArray>* array) {
  auto chunks = (*array)->chunks();
  RETURN_NOT_OK(DecodeDictionaries(pool, dense_type, &chunks));
  *array = std::make_shared<ChunkedArray>(std::move(chunks), dense_type);
  return Status::OK();
}

template <typename T>
enable_if_list_like<T, Status> ConvertListsLike(PandasOptions options,
                                                const ChunkedArray& data,
                                                PyObject** out_values) {
  using ListArrayT = typename TypeTraits<T>::ArrayType;
  // Get column of underlying value arrays
  ArrayVector value_arrays;
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = checked_cast<const ListArrayT&>(*data.chunk(c));
    // values() does not account for offsets, so we need to slice into it.
    // We can't use Flatten(), because it removes the values behind a null list
    // value, and that makes the offsets into original list values and our
    // flattened_values array different.
    std::shared_ptr<Array> flattened_values = arr.values()->Slice(
        arr.value_offset(0), arr.value_offset(arr.length()) - arr.value_offset(0));
    if (arr.value_type()->id() == Type::EXTENSION) {
      const auto& arr_ext = checked_cast<const ExtensionArray&>(*flattened_values);
      value_arrays.emplace_back(arr_ext.storage());
    } else {
      value_arrays.emplace_back(flattened_values);
    }
  }

  using ListArrayType = typename ListArrayT::TypeClass;
  const auto& list_type = checked_cast<const ListArrayType&>(*data.type());
  auto value_type = list_type.value_type();
  if (value_type->id() == Type::EXTENSION) {
    value_type = checked_cast<const ExtensionType&>(*value_type).storage_type();
  }

  auto flat_column = std::make_shared<ChunkedArray>(value_arrays, value_type);

  options = MakeInnerOptions(std::move(options));

  OwnedRefNoGIL owned_numpy_array;
  RETURN_NOT_OK(ConvertChunkedArrayToPandas(options, flat_column, nullptr,
                                            owned_numpy_array.ref()));
  PyObject* numpy_array = owned_numpy_array.obj();
  DCHECK(PyArray_Check(numpy_array));

  int64_t chunk_offset = 0;
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = checked_cast<const ListArrayT&>(*data.chunk(c));
    const bool has_nulls = data.null_count() > 0;
    for (int64_t i = 0; i < arr.length(); ++i) {
      if (has_nulls && arr.IsNull(i)) {
        Py_INCREF(Py_None);
        *out_values = Py_None;
      } else {
        // Need to subtract value_offset(0) since the original chunk might be a slice
        // into another array.
        OwnedRef start(PyLong_FromLongLong(arr.value_offset(i) + chunk_offset -
                                           arr.value_offset(0)));
        OwnedRef end(PyLong_FromLongLong(arr.value_offset(i + 1) + chunk_offset -
                                         arr.value_offset(0)));
        OwnedRef slice(PySlice_New(start.obj(), end.obj(), nullptr));

        if (ARROW_PREDICT_FALSE(slice.obj() == nullptr)) {
          // Fall out of loop, will return from RETURN_IF_PYERROR
          break;
        }
        *out_values = PyObject_GetItem(numpy_array, slice.obj());

        if (*out_values == nullptr) {
          // Fall out of loop, will return from RETURN_IF_PYERROR
          break;
        }
      }
      ++out_values;
    }
    RETURN_IF_PYERROR();

    chunk_offset += arr.value_offset(arr.length()) - arr.value_offset(0);
  }

  return Status::OK();
}

// TODO GH-40579: optimize ListView conversion to avoid unnecessary copies
template <typename T>
enable_if_list_view<T, Status> ConvertListsLike(PandasOptions options,
                                                const ChunkedArray& data,
                                                PyObject** out_values) {
  using ListViewArrayType = typename TypeTraits<T>::ArrayType;
  using NonViewType =
      std::conditional_t<T::type_id == Type::LIST_VIEW, ListType, LargeListType>;
  using NonViewClass = typename TypeTraits<NonViewType>::ArrayType;
  ArrayVector list_arrays;
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = checked_cast<const ListViewArrayType&>(*data.chunk(c));
    ARROW_ASSIGN_OR_RAISE(auto non_view_array,
                          NonViewClass::FromListView(arr, options.pool));
    list_arrays.emplace_back(non_view_array);
  }
  auto chunked_array = std::make_shared<ChunkedArray>(list_arrays);
  return ConvertListsLike<NonViewType>(options, *chunked_array, out_values);
}

template <typename F1, typename F2, typename F3>
Status ConvertMapHelper(F1 resetRow, F2 addPairToRow, F3 stealRow,
                        const ChunkedArray& data, PyArrayObject* py_keys,
                        PyArrayObject* py_items,
                        // needed for null checks in items
                        const std::vector<std::shared_ptr<Array>> item_arrays,
                        PyObject** out_values) {
  OwnedRef key_value;
  OwnedRef item_value;

  int64_t chunk_offset = 0;
  for (int c = 0; c < data.num_chunks(); ++c) {
    const auto& arr = checked_cast<const MapArray&>(*data.chunk(c));
    const bool has_nulls = data.null_count() > 0;

    // Make a list of key/item pairs for each row in array
    for (int64_t i = 0; i < arr.length(); ++i) {
      if (has_nulls && arr.IsNull(i)) {
        Py_INCREF(Py_None);
        *out_values = Py_None;
      } else {
        int64_t entry_offset = arr.value_offset(i);
        int64_t num_pairs = arr.value_offset(i + 1) - entry_offset;

        // Build the new list object for the row of Python pairs
        RETURN_NOT_OK(resetRow(num_pairs));

        // Add each key/item pair in the row
        for (int64_t j = 0; j < num_pairs; ++j) {
          // Get key value, key is non-nullable for a valid row
          auto ptr_key = reinterpret_cast<const char*>(
              PyArray_GETPTR1(py_keys, chunk_offset + entry_offset + j));
          key_value.reset(PyArray_GETITEM(py_keys, ptr_key));
          RETURN_IF_PYERROR();

          if (item_arrays[c]->IsNull(entry_offset + j)) {
            // Translate the Null to a None
            Py_INCREF(Py_None);
            item_value.reset(Py_None);
          } else {
            // Get valid value from item array
            auto ptr_item = reinterpret_cast<const char*>(
                PyArray_GETPTR1(py_items, chunk_offset + entry_offset + j));
            item_value.reset(PyArray_GETITEM(py_items, ptr_item));
            RETURN_IF_PYERROR();
          }

          // Add the key/item pair to the row
          RETURN_NOT_OK(addPairToRow(j, key_value, item_value));
        }

        // Pass ownership to the resulting array
        *out_values = stealRow();
      }
      ++out_values;
    }
    RETURN_IF_PYERROR();

    chunk_offset += arr.values()->length();
  }

  return Status::OK();
}

// A more helpful error message around TypeErrors that may stem from unhashable keys
Status CheckMapAsPydictsTypeError() {
  if (ARROW_PREDICT_TRUE(!PyErr_Occurred())) {
    return Status::OK();
  }
  if (PyErr_ExceptionMatches(PyExc_TypeError)) {
    // Modify the error string directly, so it is re-raised
    // with our additional info.
    //
    // There are not many interesting things happening when this
    // is hit. This is intended to only be called directly after
    // PyDict_SetItem, where a finite set of errors could occur.
    PyObject *type, *value, *traceback;
    PyErr_Fetch(&type, &value, &traceback);
    std::string message;
    RETURN_NOT_OK(internal::PyObject_StdStringStr(value, &message));
    message +=
        ". If keys are not hashable, then you must use the option "
        "[maps_as_pydicts=None (default)]";

    // resets the error
    PyErr_SetString(PyExc_TypeError, message.c_str());
  }
  return ConvertPyError();
}

Status CheckForDuplicateKeys(bool error_on_duplicate_keys, Py_ssize_t total_dict_len,
                             Py_ssize_t total_raw_len) {
  if (total_dict_len < total_raw_len) {
    const char* message =
        "[maps_as_pydicts] "
        "After conversion of Arrow maps to pydicts, "
        "detected data loss due to duplicate keys. "
        "Original input length is [%lld], total converted pydict length is [%lld].";
    std::array<char, 256> buf;
    std::snprintf(buf.data(), buf.size(), message, total_raw_len, total_dict_len);

    if (error_on_duplicate_keys) {
      return Status::UnknownError(buf.data());
    } else {
      ARROW_LOG(WARNING) << buf.data();
    }
  }
  return Status::OK();
}

Status ConvertMap(PandasOptions options, const ChunkedArray& data,
                  PyObject** out_values) {
  // Get columns of underlying key/item arrays
  std::vector<std::shared_ptr<Array>> key_arrays;
  std::vector<std::shared_ptr<Array>> item_arrays;
  for (int c = 0; c < data.num_chunks(); ++c) {
    const auto& map_arr = checked_cast<const MapArray&>(*data.chunk(c));
    key_arrays.emplace_back(map_arr.keys());
    item_arrays.emplace_back(map_arr.items());
  }

  const auto& map_type = checked_cast<const MapType&>(*data.type());
  auto key_type = map_type.key_type();
  auto item_type = map_type.item_type();

  // ARROW-6899: Convert dictionary-encoded children to dense instead of
  // failing below. A more efficient conversion than this could be done later
  if (key_type->id() == Type::DICTIONARY) {
    auto dense_type = checked_cast<const DictionaryType&>(*key_type).value_type();
    RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &key_arrays));
    key_type = dense_type;
  }
  if (item_type->id() == Type::DICTIONARY) {
    auto dense_type = checked_cast<const DictionaryType&>(*item_type).value_type();
    RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &item_arrays));
    item_type = dense_type;
  }

  // See notes in MakeInnerOptions.
  options = MakeInnerOptions(std::move(options));
  // Don't blindly convert because timestamps in lists are handled differently.
  options.timestamp_as_object = true;

  auto flat_keys = std::make_shared<ChunkedArray>(key_arrays, key_type);
  auto flat_items = std::make_shared<ChunkedArray>(item_arrays, item_type);
  OwnedRefNoGIL owned_numpy_keys;
  RETURN_NOT_OK(
      ConvertChunkedArrayToPandas(options, flat_keys, nullptr, owned_numpy_keys.ref()));
  OwnedRefNoGIL owned_numpy_items;
  RETURN_NOT_OK(
      ConvertChunkedArrayToPandas(options, flat_items, nullptr, owned_numpy_items.ref()));
  PyArrayObject* py_keys = reinterpret_cast<PyArrayObject*>(owned_numpy_keys.obj());
  PyArrayObject* py_items = reinterpret_cast<PyArrayObject*>(owned_numpy_items.obj());

  if (options.maps_as_pydicts == MapConversionType::DEFAULT) {
    // The default behavior to express an Arrow MAP as a list of [(key, value), ...] pairs
    OwnedRef list_item;
    return ConvertMapHelper(
        [&list_item](int64_t num_pairs) {
          list_item.reset(PyList_New(num_pairs));
          return CheckPyError();
        },
        [&list_item](int64_t idx, OwnedRef& key_value, OwnedRef& item_value) {
          PyList_SET_ITEM(list_item.obj(), idx,
                          PyTuple_Pack(2, key_value.obj(), item_value.obj()));
          return CheckPyError();
        },
        [&list_item] { return list_item.detach(); }, data, py_keys, py_items, item_arrays,
        out_values);
  } else {
    // Use a native pydict
    OwnedRef dict_item;
    Py_ssize_t total_dict_len{0};
    Py_ssize_t total_raw_len{0};

    bool error_on_duplicate_keys;
    if (options.maps_as_pydicts == MapConversionType::LOSSY) {
      error_on_duplicate_keys = false;
    } else if (options.maps_as_pydicts == MapConversionType::STRICT_) {
      error_on_duplicate_keys = true;
    } else {
      auto val = std::underlying_type_t<MapConversionType>(options.maps_as_pydicts);
      return Status::UnknownError("Received unknown option for maps_as_pydicts: " +
                                  std::to_string(val));
    }

    auto status = ConvertMapHelper(
        [&dict_item, &total_raw_len](int64_t num_pairs) {
          total_raw_len += num_pairs;
          dict_item.reset(PyDict_New());
          return CheckPyError();
        },
        [&dict_item]([[maybe_unused]] int64_t idx, OwnedRef& key_value,
                     OwnedRef& item_value) {
          auto setitem_result =
              PyDict_SetItem(dict_item.obj(), key_value.obj(), item_value.obj());
          ARROW_RETURN_NOT_OK(CheckMapAsPydictsTypeError());
          // returns -1 if there are internal errors around hashing/resizing
          return setitem_result == 0 ? Status::OK()
                                     : Status::UnknownError(
                                           "[maps_as_pydicts] "
                                           "Unexpected failure inserting Arrow (key, "
                                           "value) pair into Python dict");
        },
        [&dict_item, &total_dict_len] {
          total_dict_len += PyDict_Size(dict_item.obj());
          return dict_item.detach();
        },
        data, py_keys, py_items, item_arrays, out_values);

    ARROW_RETURN_NOT_OK(status);
    // If there were no errors generating the pydicts,
    // then check if we detected any data loss from duplicate keys.
    return CheckForDuplicateKeys(error_on_duplicate_keys, total_dict_len, total_raw_len);
  }
}

template <typename InType, typename OutType>
inline void ConvertNumericNullable(const ChunkedArray& data, InType na_value,
                                   OutType* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const InType* in_values = GetPrimitiveValues<InType>(arr);

    if (arr.null_count() > 0) {
      for (int64_t i = 0; i < arr.length(); ++i) {
        *out_values++ = arr.IsNull(i) ? na_value : in_values[i];
      }
    } else {
      memcpy(out_values, in_values, sizeof(InType) * arr.length());
      out_values += arr.length();
    }
  }
}

template <typename InType, typename OutType>
inline void ConvertNumericNullableCast(const ChunkedArray& data, InType na_value,
                                       OutType* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const InType* in_values = GetPrimitiveValues<InType>(arr);

    for (int64_t i = 0; i < arr.length(); ++i) {
      *out_values++ = arr.IsNull(i) ? static_cast<OutType>(na_value)
                                    : static_cast<OutType>(in_values[i]);
    }
  }
}

template <int NPY_TYPE>
class TypedPandasWriter : public PandasWriter {
 public:
  using T = typename npy_traits<NPY_TYPE>::value_type;

  using PandasWriter::PandasWriter;

  Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
    if (CanZeroCopy(*data)) {
      PyObject* wrapped;
      npy_intp dims[2] = {static_cast<npy_intp>(num_columns_),
                          static_cast<npy_intp>(num_rows_)};
      RETURN_NOT_OK(
          MakeNumPyView(data->chunk(0), py_ref, NPY_TYPE, /*ndim=*/2, dims, &wrapped));
      SetBlockData(wrapped);
      return Status::OK();
    } else {
      RETURN_NOT_OK(CheckNotZeroCopyOnly(*data));
      RETURN_NOT_OK(EnsureAllocated());
      return CopyInto(data, /*rel_placement=*/0);
    }
  }

  Status CheckTypeExact(const DataType& type, Type::type expected) {
    if (type.id() != expected) {
      // TODO(wesm): stringify NumPy / pandas type
      return Status::NotImplemented("Cannot write Arrow data of type ", type.ToString());
    }
    return Status::OK();
  }

  T* GetBlockColumnStart(int64_t rel_placement) {
    return reinterpret_cast<T*>(block_data_) + rel_placement * num_rows_;
  }

 protected:
  Status Allocate() override { return AllocateNDArray(NPY_TYPE); }
};

struct ObjectWriterVisitor {
  const PandasOptions& options;
  const ChunkedArray& data;
  PyObject** out_values;

  Status Visit(const NullType& type) {
    for (int c = 0; c < data.num_chunks(); c++) {
      std::shared_ptr<Array> arr = data.chunk(c);

      for (int64_t i = 0; i < arr->length(); ++i) {
        // All values are null
        Py_INCREF(Py_None);
        *out_values = Py_None;
        ++out_values;
      }
    }
    return Status::OK();
  }

  Status Visit(const BooleanType& type) {
    for (int c = 0; c < data.num_chunks(); c++) {
      const auto& arr = checked_cast<const BooleanArray&>(*data.chunk(c));

      for (int64_t i = 0; i < arr.length(); ++i) {
        if (arr.IsNull(i)) {
          Py_INCREF(Py_None);
          *out_values++ = Py_None;
        } else if (arr.Value(i)) {
          // True
          Py_INCREF(Py_True);
          *out_values++ = Py_True;
        } else {
          // False
          Py_INCREF(Py_False);
          *out_values++ = Py_False;
        }
      }
    }
    return Status::OK();
  }

  template <typename Type>
  enable_if_integer<Type, Status> Visit(const Type& type) {
    using T = typename Type::c_type;
    auto WrapValue = [](T value, PyObject** out) {
      *out = std::is_signed<T>::value ? PyLong_FromLongLong(value)
                                      : PyLong_FromUnsignedLongLong(value);
      RETURN_IF_PYERROR();
      return Status::OK();
    };
    return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
  }

  template <typename Type>
  enable_if_t<is_base_binary_type<Type>::value || is_binary_view_like_type<Type>::value ||
                  is_fixed_size_binary_type<Type>::value,
              Status>
  Visit(const Type& type) {
    auto WrapValue = [](const std::string_view& view, PyObject** out) {
      *out = WrapBytes<Type>::Wrap(view.data(), view.length());
      if (*out == nullptr) {
        PyErr_Clear();
        return Status::UnknownError("Wrapping ", view, " failed");
      }
      return Status::OK();
    };
    return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
  }

  template <typename Type>
  enable_if_date<Type, Status> Visit(const Type& type) {
    auto WrapValue = [](typename Type::c_type value, PyObject** out) {
      RETURN_NOT_OK(internal::PyDate_from_int(value, Type::UNIT, out));
      RETURN_IF_PYERROR();
      return Status::OK();
    };
    return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
  }

  template <typename Type>
  enable_if_time<Type, Status> Visit(const Type& type) {
    const TimeUnit::type unit = type.unit();
    auto WrapValue = [unit](typename Type::c_type value, PyObject** out) {
      RETURN_NOT_OK(internal::PyTime_from_int(value, unit, out));
      RETURN_IF_PYERROR();
      return Status::OK();
    };
    return ConvertAsPyObjects<Type>(options, data, WrapValue, out_values);
  }

  template <typename Type>
  enable_if_timestamp<Type, Status> Visit(const Type& type) {
    const TimeUnit::type unit = type.unit();
    OwnedRef tzinfo;

    auto ConvertTimezoneNaive = [&](typename Type::c_type value, PyObject** out) {
      RETURN_NOT_OK(internal::PyDateTime_from_int(value, unit, out));
      RETURN_IF_PYERROR();
      return Status::OK();
    };
    auto ConvertTimezoneAware = [&](typename Type::c_type value, PyObject** out) {
      PyObject* naive_datetime;
      RETURN_NOT_OK(ConvertTimezoneNaive(value, &naive_datetime));

      // convert the timezone naive datetime object to timezone aware
      // two step conversion of the datetime mimics Python's code:
      // dt.replace(tzinfo=datetime.timezone.utc).astimezone(tzinfo)
      // first step: replacing timezone with timezone.utc (replace method)
      OwnedRef args(PyTuple_New(0));
      OwnedRef keywords(PyDict_New());
      PyDict_SetItemString(keywords.obj(), "tzinfo", PyDateTime_TimeZone_UTC);
      OwnedRef naive_datetime_replace(PyObject_GetAttrString(naive_datetime, "replace"));
      OwnedRef datetime_utc(
          PyObject_Call(naive_datetime_replace.obj(), args.obj(), keywords.obj()));
      // second step: adjust the datetime to tzinfo timezone (astimezone method)
      *out = PyObject_CallMethod(datetime_utc.obj(), "astimezone", "O", tzinfo.obj());

      // the timezone naive object is no longer required
      Py_DECREF(naive_datetime);
      RETURN_IF_PYERROR();

      return Status::OK();
    };

    if (!type.timezone().empty() && !options.ignore_timezone) {
      // convert timezone aware
      PyObject* tzobj;
      ARROW_ASSIGN_OR_RAISE(tzobj, internal::StringToTzinfo(type.timezone()));
      tzinfo.reset(tzobj);
      RETURN_IF_PYERROR();
      RETURN_NOT_OK(
          ConvertAsPyObjects<Type>(options, data, ConvertTimezoneAware, out_values));
    } else {
      // convert timezone naive
      RETURN_NOT_OK(
          ConvertAsPyObjects<Type>(options, data, ConvertTimezoneNaive, out_values));
    }

    return Status::OK();
  }

  template <typename Type>
  enable_if_t<std::is_same<Type, MonthDayNanoIntervalType>::value, Status> Visit(
      const Type& type) {
    OwnedRef args(PyTuple_New(0));
    OwnedRef kwargs(PyDict_New());
    RETURN_IF_PYERROR();
    auto to_date_offset = [&](const MonthDayNanoIntervalType::MonthDayNanos& interval,
                              PyObject** out) {
      DCHECK(internal::BorrowPandasDataOffsetType() != nullptr);
      // DateOffset objects do not add nanoseconds component to pd.Timestamp.
      // as of  Pandas 1.3.3
      // (https://github.com/pandas-dev/pandas/issues/43892).
      // So convert microseconds and remainder to preserve data
      // but give users more expected results.
      int64_t microseconds = interval.nanoseconds / 1000;
      int64_t nanoseconds;
      if (interval.nanoseconds >= 0) {
        nanoseconds = interval.nanoseconds % 1000;
      } else {
        nanoseconds = -((-interval.nanoseconds) % 1000);
      }

      PyDict_SetItemString(kwargs.obj(), "months", PyLong_FromLong(interval.months));
      PyDict_SetItemString(kwargs.obj(), "days", PyLong_FromLong(interval.days));
      PyDict_SetItemString(kwargs.obj(), "microseconds",
                           PyLong_FromLongLong(microseconds));
      PyDict_SetItemString(kwargs.obj(), "nanoseconds", PyLong_FromLongLong(nanoseconds));
      *out =
          PyObject_Call(internal::BorrowPandasDataOffsetType(), args.obj(), kwargs.obj());
      RETURN_IF_PYERROR();
      return Status::OK();
    };
    return ConvertAsPyObjects<MonthDayNanoIntervalType>(options, data, to_date_offset,
                                                        out_values);
  }

  template <typename DecimalT, typename DecimalArrayT>
  Status VisitDecimal(const DecimalT& type) {
    OwnedRef decimal;
    OwnedRef Decimal;
    RETURN_NOT_OK(internal::ImportModule("decimal", &decimal));
    RETURN_NOT_OK(internal::ImportFromModule(decimal.obj(), "Decimal", &Decimal));
    PyObject* decimal_constructor = Decimal.obj();

    for (int c = 0; c < data.num_chunks(); c++) {
      const auto& arr = checked_cast<const DecimalArrayT&>(*data.chunk(c));

      for (int64_t i = 0; i < arr.length(); ++i) {
        if (arr.IsNull(i)) {
          Py_INCREF(Py_None);
          *out_values++ = Py_None;
        } else {
          *out_values++ =
              internal::DecimalFromString(decimal_constructor, arr.FormatValue(i));
          RETURN_IF_PYERROR();
        }
      }
    }

    return Status::OK();
  }

  Status Visit(const Decimal32Type& type) {
    return VisitDecimal<Decimal32Type, Decimal32Array>(type);
  }

  Status Visit(const Decimal64Type& type) {
    return VisitDecimal<Decimal64Type, Decimal64Array>(type);
  }

  Status Visit(const Decimal128Type& type) {
    return VisitDecimal<Decimal128Type, Decimal128Array>(type);
  }

  Status Visit(const Decimal256Type& type) {
    return VisitDecimal<Decimal256Type, Decimal256Array>(type);
  }

  template <typename T>
  enable_if_t<is_list_like_type<T>::value || is_list_view_type<T>::value, Status> Visit(
      const T& type) {
    if (!ListTypeSupported(*type.value_type())) {
      return Status::NotImplemented(
          "Not implemented type for conversion from List to Pandas: ",
          type.value_type()->ToString());
    }
    return ConvertListsLike<T>(options, data, out_values);
  }

  Status Visit(const MapType& type) { return ConvertMap(options, data, out_values); }

  Status Visit(const StructType& type) {
    return ConvertStruct(options, data, out_values);
  }

  template <typename Type>
  enable_if_t<is_floating_type<Type>::value ||
                  std::is_same<DictionaryType, Type>::value ||
                  std::is_same<DurationType, Type>::value ||
                  std::is_same<RunEndEncodedType, Type>::value ||
                  std::is_same<ExtensionType, Type>::value ||
                  (std::is_base_of<IntervalType, Type>::value &&
                   !std::is_same<MonthDayNanoIntervalType, Type>::value) ||
                  std::is_base_of<UnionType, Type>::value,
              Status>
  Visit(const Type& type) {
    return Status::NotImplemented("No implemented conversion to object dtype: ",
                                  type.ToString());
  }
};

class ObjectWriter : public TypedPandasWriter<NPY_OBJECT> {
 public:
  using TypedPandasWriter<NPY_OBJECT>::TypedPandasWriter;
  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    PyAcquireGIL lock;
    ObjectWriterVisitor visitor{this->options_, *data,
                                this->GetBlockColumnStart(rel_placement)};
    return VisitTypeInline(*data->type(), &visitor);
  }
};

static inline bool IsNonNullContiguous(const ChunkedArray& data) {
  return data.num_chunks() == 1 && data.null_count() == 0;
}

template <int NPY_TYPE>
class IntWriter : public TypedPandasWriter<NPY_TYPE> {
 public:
  using ArrowType = typename npy_traits<NPY_TYPE>::TypeClass;
  using TypedPandasWriter<NPY_TYPE>::TypedPandasWriter;

  bool CanZeroCopy(const ChunkedArray& data) const override {
    return IsNonNullContiguous(data);
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    RETURN_NOT_OK(this->CheckTypeExact(*data->type(), ArrowType::type_id));
    ConvertIntegerNoNullsSameType<typename ArrowType::c_type>(
        this->options_, *data, this->GetBlockColumnStart(rel_placement));
    return Status::OK();
  }
};

template <int NPY_TYPE>
class FloatWriter : public TypedPandasWriter<NPY_TYPE> {
 public:
  using ArrowType = typename npy_traits<NPY_TYPE>::TypeClass;
  using TypedPandasWriter<NPY_TYPE>::TypedPandasWriter;
  using T = typename ArrowType::c_type;

  bool CanZeroCopy(const ChunkedArray& data) const override {
    return IsNonNullContiguous(data) && data.type()->id() == ArrowType::type_id;
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    Type::type in_type = data->type()->id();
    auto out_values = this->GetBlockColumnStart(rel_placement);

#define INTEGER_CASE(IN_TYPE)                                             \
  ConvertIntegerWithNulls<IN_TYPE, T>(this->options_, *data, out_values); \
  break;

    switch (in_type) {
      case Type::UINT8:
        INTEGER_CASE(uint8_t);
      case Type::INT8:
        INTEGER_CASE(int8_t);
      case Type::UINT16:
        INTEGER_CASE(uint16_t);
      case Type::INT16:
        INTEGER_CASE(int16_t);
      case Type::UINT32:
        INTEGER_CASE(uint32_t);
      case Type::INT32:
        INTEGER_CASE(int32_t);
      case Type::UINT64:
        INTEGER_CASE(uint64_t);
      case Type::INT64:
        INTEGER_CASE(int64_t);
      case Type::HALF_FLOAT:
        ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
      case Type::FLOAT:
        ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
        break;
      case Type::DOUBLE:
        ConvertNumericNullableCast(*data, npy_traits<NPY_TYPE>::na_sentinel, out_values);
        break;
      default:
        return Status::NotImplemented("Cannot write Arrow data of type ",
                                      data->type()->ToString(),
                                      " to a Pandas floating point block");
    }

#undef INTEGER_CASE

    return Status::OK();
  }
};

using UInt8Writer = IntWriter<NPY_UINT8>;
using Int8Writer = IntWriter<NPY_INT8>;
using UInt16Writer = IntWriter<NPY_UINT16>;
using Int16Writer = IntWriter<NPY_INT16>;
using UInt32Writer = IntWriter<NPY_UINT32>;
using Int32Writer = IntWriter<NPY_INT32>;
using UInt64Writer = IntWriter<NPY_UINT64>;
using Int64Writer = IntWriter<NPY_INT64>;
using Float16Writer = FloatWriter<NPY_FLOAT16>;
using Float32Writer = FloatWriter<NPY_FLOAT32>;
using Float64Writer = FloatWriter<NPY_FLOAT64>;

class BoolWriter : public TypedPandasWriter<NPY_BOOL> {
 public:
  using TypedPandasWriter<NPY_BOOL>::TypedPandasWriter;

  Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
    RETURN_NOT_OK(
        CheckNoZeroCopy("Zero copy conversions not possible with "
                        "boolean types"));
    RETURN_NOT_OK(EnsureAllocated());
    return CopyInto(data, /*rel_placement=*/0);
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    RETURN_NOT_OK(this->CheckTypeExact(*data->type(), Type::BOOL));
    auto out_values = this->GetBlockColumnStart(rel_placement);
    for (int c = 0; c < data->num_chunks(); c++) {
      const auto& arr = checked_cast<const BooleanArray&>(*data->chunk(c));
      for (int64_t i = 0; i < arr.length(); ++i) {
        *out_values++ = static_cast<uint8_t>(arr.Value(i));
      }
    }
    return Status::OK();
  }
};

// ----------------------------------------------------------------------
// Date / timestamp types

template <typename T, int64_t SHIFT>
inline void ConvertDatetime(const ChunkedArray& data, int64_t* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const T* in_values = GetPrimitiveValues<T>(arr);

    for (int64_t i = 0; i < arr.length(); ++i) {
      *out_values++ = arr.IsNull(i) ? kPandasTimestampNull
                                    : (static_cast<int64_t>(in_values[i]) * SHIFT);
    }
  }
}

template <typename T, int SHIFT>
void ConvertDatesShift(const ChunkedArray& data, int64_t* out_values) {
  for (int c = 0; c < data.num_chunks(); c++) {
    const auto& arr = *data.chunk(c);
    const T* in_values = GetPrimitiveValues<T>(arr);
    for (int64_t i = 0; i < arr.length(); ++i) {
      *out_values++ = arr.IsNull(i) ? kPandasTimestampNull
                                    : static_cast<int64_t>(in_values[i]) / SHIFT;
    }
  }
}

class DatetimeDayWriter : public TypedPandasWriter<NPY_DATETIME> {
 public:
  using TypedPandasWriter<NPY_DATETIME>::TypedPandasWriter;

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    int64_t* out_values = this->GetBlockColumnStart(rel_placement);
    const auto& type = checked_cast<const DateType&>(*data->type());
    switch (type.unit()) {
      case DateUnit::DAY:
        ConvertDatesShift<int32_t, 1LL>(*data, out_values);
        break;
      case DateUnit::MILLI:
        ConvertDatesShift<int64_t, 86400000LL>(*data, out_values);
        break;
    }
    return Status::OK();
  }

 protected:
  Status Allocate() override {
    RETURN_NOT_OK(this->AllocateNDArray(NPY_DATETIME));
    SetDatetimeUnit(NPY_FR_D);
    return Status::OK();
  }
};

template <TimeUnit::type UNIT>
class DatetimeWriter : public TypedPandasWriter<NPY_DATETIME> {
 public:
  using TypedPandasWriter<NPY_DATETIME>::TypedPandasWriter;

  bool CanZeroCopy(const ChunkedArray& data) const override {
    if (data.type()->id() == Type::TIMESTAMP) {
      const auto& type = checked_cast<const TimestampType&>(*data.type());
      return IsNonNullContiguous(data) && type.unit() == UNIT;
    } else {
      return false;
    }
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    const auto& ts_type = checked_cast<const TimestampType&>(*data->type());
    DCHECK_EQ(UNIT, ts_type.unit()) << "Should only call instances of this writer "
                                    << "with arrays of the correct unit";
    ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull,
                                    this->GetBlockColumnStart(rel_placement));
    return Status::OK();
  }

 protected:
  Status Allocate() override {
    RETURN_NOT_OK(this->AllocateNDArray(NPY_DATETIME));
    SetDatetimeUnit(internal::NumPyFrequency(UNIT));
    return Status::OK();
  }
};

using DatetimeSecondWriter = DatetimeWriter<TimeUnit::SECOND>;

class DatetimeMilliWriter : public DatetimeWriter<TimeUnit::MILLI> {
 public:
  using DatetimeWriter<TimeUnit::MILLI>::DatetimeWriter;

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    Type::type type = data->type()->id();
    int64_t* out_values = this->GetBlockColumnStart(rel_placement);
    if (type == Type::DATE32) {
      // Convert from days since epoch to datetime64[ms]
      ConvertDatetime<int32_t, 86400000L>(*data, out_values);
    } else if (type == Type::DATE64) {
      ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
    } else {
      const auto& ts_type = checked_cast<const TimestampType&>(*data->type());
      DCHECK_EQ(TimeUnit::MILLI, ts_type.unit())
          << "Should only call instances of this writer "
          << "with arrays of the correct unit";
      ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
    }
    return Status::OK();
  }
};

using DatetimeMicroWriter = DatetimeWriter<TimeUnit::MICRO>;

class DatetimeNanoWriter : public DatetimeWriter<TimeUnit::NANO> {
 public:
  using DatetimeWriter<TimeUnit::NANO>::DatetimeWriter;

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    Type::type type = data->type()->id();
    int64_t* out_values = this->GetBlockColumnStart(rel_placement);
    compute::ExecContext ctx(options_.pool);
    compute::CastOptions options;
    if (options_.safe_cast) {
      options = compute::CastOptions::Safe();
    } else {
      options = compute::CastOptions::Unsafe();
    }
    Datum out;
    auto target_type = timestamp(TimeUnit::NANO);

    if (type == Type::DATE32) {
      // Convert from days since epoch to datetime64[ns]
      ConvertDatetime<int32_t, kNanosecondsInDay>(*data, out_values);
    } else if (type == Type::DATE64) {
      // Date64Type is millisecond timestamp stored as int64_t
      // TODO(wesm): Do we want to make sure to zero out the milliseconds?
      ConvertDatetime<int64_t, 1000000L>(*data, out_values);
    } else if (type == Type::TIMESTAMP) {
      const auto& ts_type = checked_cast<const TimestampType&>(*data->type());

      if (ts_type.unit() == TimeUnit::NANO) {
        ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
      } else if (ts_type.unit() == TimeUnit::MICRO || ts_type.unit() == TimeUnit::MILLI ||
                 ts_type.unit() == TimeUnit::SECOND) {
        ARROW_ASSIGN_OR_RAISE(out, compute::Cast(data, target_type, options, &ctx));
        ConvertNumericNullable<int64_t>(*out.chunked_array(), kPandasTimestampNull,
                                        out_values);
      } else {
        return Status::NotImplemented("Unsupported time unit");
      }
    } else {
      return Status::NotImplemented("Cannot write Arrow data of type ",
                                    data->type()->ToString(),
                                    " to a Pandas datetime block.");
    }
    return Status::OK();
  }
};

template <typename BASE>
class DatetimeTZWriter : public BASE {
 public:
  DatetimeTZWriter(const PandasOptions& options, const std::string& timezone,
                   int64_t num_rows)
      : BASE(options, num_rows, 1), timezone_(timezone) {}

 protected:
  Status GetResultBlock(PyObject** out) override {
    RETURN_NOT_OK(this->MakeBlock1D());
    *out = this->block_arr_.obj();
    return Status::OK();
  }

  Status AddResultMetadata(PyObject* result) override {
    PyObject* py_tz = PyUnicode_FromStringAndSize(
        timezone_.c_str(), static_cast<Py_ssize_t>(timezone_.size()));
    RETURN_IF_PYERROR();
    PyDict_SetItemString(result, "timezone", py_tz);
    Py_DECREF(py_tz);
    return Status::OK();
  }

 private:
  std::string timezone_;
};

using DatetimeSecondTZWriter = DatetimeTZWriter<DatetimeSecondWriter>;
using DatetimeMilliTZWriter = DatetimeTZWriter<DatetimeMilliWriter>;
using DatetimeMicroTZWriter = DatetimeTZWriter<DatetimeMicroWriter>;
using DatetimeNanoTZWriter = DatetimeTZWriter<DatetimeNanoWriter>;

template <TimeUnit::type UNIT>
class TimedeltaWriter : public TypedPandasWriter<NPY_TIMEDELTA> {
 public:
  using TypedPandasWriter<NPY_TIMEDELTA>::TypedPandasWriter;

  Status AllocateTimedelta(int ndim) {
    RETURN_NOT_OK(this->AllocateNDArray(NPY_TIMEDELTA, ndim));
    SetDatetimeUnit(internal::NumPyFrequency(UNIT));
    return Status::OK();
  }

  bool CanZeroCopy(const ChunkedArray& data) const override {
    const auto& type = checked_cast<const DurationType&>(*data.type());
    return IsNonNullContiguous(data) && type.unit() == UNIT;
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    const auto& type = checked_cast<const DurationType&>(*data->type());
    DCHECK_EQ(UNIT, type.unit()) << "Should only call instances of this writer "
                                 << "with arrays of the correct unit";
    ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull,
                                    this->GetBlockColumnStart(rel_placement));
    return Status::OK();
  }

 protected:
  Status Allocate() override { return AllocateTimedelta(2); }
};

using TimedeltaSecondWriter = TimedeltaWriter<TimeUnit::SECOND>;
using TimedeltaMilliWriter = TimedeltaWriter<TimeUnit::MILLI>;
using TimedeltaMicroWriter = TimedeltaWriter<TimeUnit::MICRO>;

class TimedeltaNanoWriter : public TimedeltaWriter<TimeUnit::NANO> {
 public:
  using TimedeltaWriter<TimeUnit::NANO>::TimedeltaWriter;

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    Type::type type = data->type()->id();
    int64_t* out_values = this->GetBlockColumnStart(rel_placement);
    if (type == Type::DURATION) {
      const auto& ts_type = checked_cast<const DurationType&>(*data->type());
      if (ts_type.unit() == TimeUnit::NANO) {
        ConvertNumericNullable<int64_t>(*data, kPandasTimestampNull, out_values);
      } else if (ts_type.unit() == TimeUnit::MICRO) {
        ConvertDatetime<int64_t, 1000L>(*data, out_values);
      } else if (ts_type.unit() == TimeUnit::MILLI) {
        ConvertDatetime<int64_t, 1000000L>(*data, out_values);
      } else if (ts_type.unit() == TimeUnit::SECOND) {
        ConvertDatetime<int64_t, 1000000000L>(*data, out_values);
      } else {
        return Status::NotImplemented("Unsupported time unit");
      }
    } else {
      return Status::NotImplemented("Cannot write Arrow data of type ",
                                    data->type()->ToString(),
                                    " to a Pandas timedelta block.");
    }
    return Status::OK();
  }
};

Status MakeZeroLengthArray(const std::shared_ptr<DataType>& type,
                           std::shared_ptr<Array>* out) {
  std::unique_ptr<ArrayBuilder> builder;
  RETURN_NOT_OK(MakeBuilder(default_memory_pool(), type, &builder));
  RETURN_NOT_OK(builder->Resize(0));
  return builder->Finish(out);
}

bool NeedDictionaryUnification(const ChunkedArray& data) {
  if (data.num_chunks() < 2) {
    return false;
  }
  const auto& arr_first = checked_cast<const DictionaryArray&>(*data.chunk(0));
  for (int c = 1; c < data.num_chunks(); c++) {
    const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
    if (!(arr_first.dictionary()->Equals(arr.dictionary()))) {
      return true;
    }
  }
  return false;
}

template <typename IndexType>
class CategoricalWriter
    : public TypedPandasWriter<arrow_traits<IndexType::type_id>::npy_type> {
 public:
  using TRAITS = arrow_traits<IndexType::type_id>;
  using ArrayType = typename TypeTraits<IndexType>::ArrayType;
  using T = typename TRAITS::T;

  explicit CategoricalWriter(const PandasOptions& options, int64_t num_rows)
      : TypedPandasWriter<TRAITS::npy_type>(options, num_rows, 1),
        ordered_(false),
        needs_copy_(false) {}

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    return Status::NotImplemented("categorical type");
  }

  Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
    const auto& dict_type = checked_cast<const DictionaryType&>(*data->type());
    std::shared_ptr<Array> dict;
    if (data->num_chunks() == 0) {
      // no dictionary values => create empty array
      RETURN_NOT_OK(this->AllocateNDArray(TRAITS::npy_type, 1));
      RETURN_NOT_OK(MakeZeroLengthArray(dict_type.value_type(), &dict));
    } else {
      DCHECK_EQ(IndexType::type_id, dict_type.index_type()->id());
      RETURN_NOT_OK(WriteIndices(*data, &dict));
    }

    PyObject* pydict;
    RETURN_NOT_OK(ConvertArrayToPandas(this->options_, dict, nullptr, &pydict));
    dictionary_.reset(pydict);
    ordered_ = dict_type.ordered();
    return Status::OK();
  }

  Status Write(std::shared_ptr<ChunkedArray> data, int64_t abs_placement,
               int64_t rel_placement) override {
    RETURN_NOT_OK(this->EnsurePlacementAllocated());
    RETURN_NOT_OK(TransferSingle(data, /*py_ref=*/nullptr));
    this->placement_data_[rel_placement] = abs_placement;
    return Status::OK();
  }

  Status GetSeriesResult(PyObject** out) override {
    PyAcquireGIL lock;

    PyObject* result = PyDict_New();
    RETURN_IF_PYERROR();

    // Expected single array dictionary layout
    PyDict_SetItemString(result, "indices", this->block_arr_.obj());
    RETURN_IF_PYERROR();
    RETURN_NOT_OK(AddResultMetadata(result));

    *out = result;
    return Status::OK();
  }

 protected:
  Status AddResultMetadata(PyObject* result) override {
    PyDict_SetItemString(result, "dictionary", dictionary_.obj());
    PyObject* py_ordered = ordered_ ? Py_True : Py_False;
    Py_INCREF(py_ordered);
    PyDict_SetItemString(result, "ordered", py_ordered);
    return Status::OK();
  }

  Status WriteIndicesUniform(const ChunkedArray& data) {
    RETURN_NOT_OK(this->AllocateNDArray(TRAITS::npy_type, 1));
    T* out_values = reinterpret_cast<T*>(this->block_data_);

    for (int c = 0; c < data.num_chunks(); c++) {
      const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
      const auto& indices = checked_cast<const ArrayType&>(*arr.indices());
      auto values = reinterpret_cast<const T*>(indices.raw_values());

      RETURN_NOT_OK(CheckIndexBounds(*indices.data(), arr.dictionary()->length()));
      // Null is -1 in CategoricalBlock
      for (int i = 0; i < arr.length(); ++i) {
        if (indices.IsValid(i)) {
          *out_values++ = values[i];
        } else {
          *out_values++ = -1;
        }
      }
    }
    return Status::OK();
  }

  Status WriteIndicesVarying(const ChunkedArray& data, std::shared_ptr<Array>* out_dict) {
    // Yield int32 indices to allow for dictionary outgrowing the current index
    // type
    RETURN_NOT_OK(this->AllocateNDArray(NPY_INT32, 1));
    auto out_values = reinterpret_cast<int32_t*>(this->block_data_);

    const auto& dict_type = checked_cast<const DictionaryType&>(*data.type());

    ARROW_ASSIGN_OR_RAISE(auto unifier, DictionaryUnifier::Make(dict_type.value_type(),
                                                                this->options_.pool));
    for (int c = 0; c < data.num_chunks(); c++) {
      const auto& arr = checked_cast<const DictionaryArray&>(*data.chunk(c));
      const auto& indices = checked_cast<const ArrayType&>(*arr.indices());
      auto values = reinterpret_cast<const T*>(indices.raw_values());

      std::shared_ptr<Buffer> transpose_buffer;
      RETURN_NOT_OK(unifier->Unify(*arr.dictionary(), &transpose_buffer));

      auto transpose = reinterpret_cast<const int32_t*>(transpose_buffer->data());
      int64_t dict_length = arr.dictionary()->length();

      RETURN_NOT_OK(CheckIndexBounds(*indices.data(), dict_length));

      // Null is -1 in CategoricalBlock
      for (int i = 0; i < arr.length(); ++i) {
        if (indices.IsValid(i)) {
          *out_values++ = transpose[values[i]];
        } else {
          *out_values++ = -1;
        }
      }
    }

    std::shared_ptr<DataType> unused_type;
    return unifier->GetResult(&unused_type, out_dict);
  }

  Status WriteIndices(const ChunkedArray& data, std::shared_ptr<Array>* out_dict) {
    DCHECK_GT(data.num_chunks(), 0);

    // Sniff the first chunk
    const auto& arr_first = checked_cast<const DictionaryArray&>(*data.chunk(0));
    const auto indices_first = std::static_pointer_cast<ArrayType>(arr_first.indices());

    if (data.num_chunks() == 1 && indices_first->null_count() == 0) {
      RETURN_NOT_OK(
          CheckIndexBounds(*indices_first->data(), arr_first.dictionary()->length()));

      PyObject* wrapped;
      npy_intp dims[1] = {static_cast<npy_intp>(this->num_rows_)};
      RETURN_NOT_OK(MakeNumPyView(indices_first, /*py_ref=*/nullptr, TRAITS::npy_type,
                                  /*ndim=*/1, dims, &wrapped));
      this->SetBlockData(wrapped);
      *out_dict = arr_first.dictionary();
    } else {
      RETURN_NOT_OK(this->CheckNotZeroCopyOnly(data));
      if (NeedDictionaryUnification(data)) {
        RETURN_NOT_OK(WriteIndicesVarying(data, out_dict));
      } else {
        RETURN_NOT_OK(WriteIndicesUniform(data));
        *out_dict = arr_first.dictionary();
      }
    }
    return Status::OK();
  }

  OwnedRefNoGIL dictionary_;
  bool ordered_;
  bool needs_copy_;
};

class ExtensionWriter : public PandasWriter {
 public:
  using PandasWriter::PandasWriter;

  Status Allocate() override {
    // no-op
    return Status::OK();
  }

  Status TransferSingle(std::shared_ptr<ChunkedArray> data, PyObject* py_ref) override {
    PyAcquireGIL lock;
    PyObject* py_array;
    py_array = wrap_chunked_array(data);
    py_array_.reset(py_array);

    return Status::OK();
  }

  Status CopyInto(std::shared_ptr<ChunkedArray> data, int64_t rel_placement) override {
    return TransferSingle(data, nullptr);
  }

  Status GetDataFrameResult(PyObject** out) override {
    PyAcquireGIL lock;
    PyObject* result = PyDict_New();
    RETURN_IF_PYERROR();

    PyDict_SetItemString(result, "py_array", py_array_.obj());
    PyDict_SetItemString(result, "placement", placement_arr_.obj());
    *out = result;
    return Status::OK();
  }

  Status GetSeriesResult(PyObject** out) override {
    *out = py_array_.detach();
    return Status::OK();
  }

 protected:
  OwnedRefNoGIL py_array_;
};

Status MakeWriter(const PandasOptions& options, PandasWriter::type writer_type,
                  const DataType& type, int64_t num_rows, int num_columns,
                  std::shared_ptr<PandasWriter>* writer) {
#define BLOCK_CASE(NAME, TYPE)                                        \
  case PandasWriter::NAME:                                            \
    *writer = std::make_shared<TYPE>(options, num_rows, num_columns); \
    break;

#define CATEGORICAL_CASE(TYPE)                                              \
  case TYPE::type_id:                                                       \
    *writer = std::make_shared<CategoricalWriter<TYPE>>(options, num_rows); \
    break;

#define TZ_CASE(NAME, TYPE)                                                  \
  case PandasWriter::NAME: {                                                 \
    const auto& ts_type = checked_cast<const TimestampType&>(type);          \
    *writer = std::make_shared<TYPE>(options, ts_type.timezone(), num_rows); \
  } break;

  switch (writer_type) {
    case PandasWriter::CATEGORICAL: {
      const auto& index_type = *checked_cast<const DictionaryType&>(type).index_type();
      switch (index_type.id()) {
        CATEGORICAL_CASE(Int8Type);
        CATEGORICAL_CASE(Int16Type);
        CATEGORICAL_CASE(Int32Type);
        CATEGORICAL_CASE(Int64Type);
        case Type::UINT8:
        case Type::UINT16:
        case Type::UINT32:
        case Type::UINT64:
          return Status::TypeError(
              "Converting unsigned dictionary indices to pandas",
              " not yet supported, index type: ", index_type.ToString());
        default:
          // Unreachable
          DCHECK(false);
          break;
      }
    } break;
    case PandasWriter::EXTENSION:
      *writer = std::make_shared<ExtensionWriter>(options, num_rows, num_columns);
      break;
      BLOCK_CASE(OBJECT, ObjectWriter);
      BLOCK_CASE(UINT8, UInt8Writer);
      BLOCK_CASE(INT8, Int8Writer);
      BLOCK_CASE(UINT16, UInt16Writer);
      BLOCK_CASE(INT16, Int16Writer);
      BLOCK_CASE(UINT32, UInt32Writer);
      BLOCK_CASE(INT32, Int32Writer);
      BLOCK_CASE(UINT64, UInt64Writer);
      BLOCK_CASE(INT64, Int64Writer);
      BLOCK_CASE(HALF_FLOAT, Float16Writer);
      BLOCK_CASE(FLOAT, Float32Writer);
      BLOCK_CASE(DOUBLE, Float64Writer);
      BLOCK_CASE(BOOL, BoolWriter);
      BLOCK_CASE(DATETIME_DAY, DatetimeDayWriter);
      BLOCK_CASE(DATETIME_SECOND, DatetimeSecondWriter);
      BLOCK_CASE(DATETIME_MILLI, DatetimeMilliWriter);
      BLOCK_CASE(DATETIME_MICRO, DatetimeMicroWriter);
      BLOCK_CASE(DATETIME_NANO, DatetimeNanoWriter);
      BLOCK_CASE(TIMEDELTA_SECOND, TimedeltaSecondWriter);
      BLOCK_CASE(TIMEDELTA_MILLI, TimedeltaMilliWriter);
      BLOCK_CASE(TIMEDELTA_MICRO, TimedeltaMicroWriter);
      BLOCK_CASE(TIMEDELTA_NANO, TimedeltaNanoWriter);
      TZ_CASE(DATETIME_SECOND_TZ, DatetimeSecondTZWriter);
      TZ_CASE(DATETIME_MILLI_TZ, DatetimeMilliTZWriter);
      TZ_CASE(DATETIME_MICRO_TZ, DatetimeMicroTZWriter);
      TZ_CASE(DATETIME_NANO_TZ, DatetimeNanoTZWriter);
    default:
      return Status::NotImplemented("Unsupported block type");
  }

#undef BLOCK_CASE
#undef CATEGORICAL_CASE

  return Status::OK();
}

static Status GetPandasWriterType(const ChunkedArray& data, const PandasOptions& options,
                                  PandasWriter::type* output_type) {
#define INTEGER_CASE(NAME)                                                             \
  *output_type =                                                                       \
      data.null_count() > 0                                                            \
          ? options.integer_object_nulls ? PandasWriter::OBJECT : PandasWriter::DOUBLE \
          : PandasWriter::NAME;                                                        \
  break;

  switch (data.type()->id()) {
    case Type::BOOL:
      *output_type = data.null_count() > 0 ? PandasWriter::OBJECT : PandasWriter::BOOL;
      break;
    case Type::UINT8:
      INTEGER_CASE(UINT8);
    case Type::INT8:
      INTEGER_CASE(INT8);
    case Type::UINT16:
      INTEGER_CASE(UINT16);
    case Type::INT16:
      INTEGER_CASE(INT16);
    case Type::UINT32:
      INTEGER_CASE(UINT32);
    case Type::INT32:
      INTEGER_CASE(INT32);
    case Type::UINT64:
      INTEGER_CASE(UINT64);
    case Type::INT64:
      INTEGER_CASE(INT64);
    case Type::HALF_FLOAT:
      *output_type = PandasWriter::HALF_FLOAT;
      break;
    case Type::FLOAT:
      *output_type = PandasWriter::FLOAT;
      break;
    case Type::DOUBLE:
      *output_type = PandasWriter::DOUBLE;
      break;
    case Type::STRING:        // fall through
    case Type::LARGE_STRING:  // fall through
    case Type::STRING_VIEW:   // fall through
    case Type::BINARY:        // fall through
    case Type::LARGE_BINARY:
    case Type::BINARY_VIEW:
    case Type::NA:                       // fall through
    case Type::FIXED_SIZE_BINARY:        // fall through
    case Type::STRUCT:                   // fall through
    case Type::TIME32:                   // fall through
    case Type::TIME64:                   // fall through
    case Type::DECIMAL128:               // fall through
    case Type::DECIMAL256:               // fall through
    case Type::INTERVAL_MONTH_DAY_NANO:  // fall through
      *output_type = PandasWriter::OBJECT;
      break;
    case Type::DATE32:
      if (options.date_as_object) {
        *output_type = PandasWriter::OBJECT;
      } else if (options.coerce_temporal_nanoseconds) {
        *output_type = PandasWriter::DATETIME_NANO;
      } else if (options.to_numpy) {
        // Numpy supports Day, but Pandas does not
        *output_type = PandasWriter::DATETIME_DAY;
      } else {
        *output_type = PandasWriter::DATETIME_MILLI;
      }
      break;
    case Type::DATE64:
      if (options.date_as_object) {
        *output_type = PandasWriter::OBJECT;
      } else if (options.coerce_temporal_nanoseconds) {
        *output_type = PandasWriter::DATETIME_NANO;
      } else {
        *output_type = PandasWriter::DATETIME_MILLI;
      }
      break;
    case Type::TIMESTAMP: {
      const auto& ts_type = checked_cast<const TimestampType&>(*data.type());
      if (options.timestamp_as_object && ts_type.unit() != TimeUnit::NANO) {
        // Nanoseconds are never out of bounds for pandas, so in that case
        // we don't convert to object
        *output_type = PandasWriter::OBJECT;
      } else if (options.coerce_temporal_nanoseconds) {
        if (!ts_type.timezone().empty()) {
          *output_type = PandasWriter::DATETIME_NANO_TZ;
        } else {
          *output_type = PandasWriter::DATETIME_NANO;
        }
      } else {
        if (!ts_type.timezone().empty()) {
          switch (ts_type.unit()) {
            case TimeUnit::SECOND:
              *output_type = PandasWriter::DATETIME_SECOND_TZ;
              break;
            case TimeUnit::MILLI:
              *output_type = PandasWriter::DATETIME_MILLI_TZ;
              break;
            case TimeUnit::MICRO:
              *output_type = PandasWriter::DATETIME_MICRO_TZ;
              break;
            case TimeUnit::NANO:
              *output_type = PandasWriter::DATETIME_NANO_TZ;
              break;
          }
        } else {
          switch (ts_type.unit()) {
            case TimeUnit::SECOND:
              *output_type = PandasWriter::DATETIME_SECOND;
              break;
            case TimeUnit::MILLI:
              *output_type = PandasWriter::DATETIME_MILLI;
              break;
            case TimeUnit::MICRO:
              *output_type = PandasWriter::DATETIME_MICRO;
              break;
            case TimeUnit::NANO:
              *output_type = PandasWriter::DATETIME_NANO;
              break;
          }
        }
      }
    } break;
    case Type::DURATION: {
      const auto& dur_type = checked_cast<const DurationType&>(*data.type());
      if (options.coerce_temporal_nanoseconds) {
        *output_type = PandasWriter::TIMEDELTA_NANO;
      } else {
        switch (dur_type.unit()) {
          case TimeUnit::SECOND:
            *output_type = PandasWriter::TIMEDELTA_SECOND;
            break;
          case TimeUnit::MILLI:
            *output_type = PandasWriter::TIMEDELTA_MILLI;
            break;
          case TimeUnit::MICRO:
            *output_type = PandasWriter::TIMEDELTA_MICRO;
            break;
          case TimeUnit::NANO:
            *output_type = PandasWriter::TIMEDELTA_NANO;
            break;
        }
      }
    } break;
    case Type::FIXED_SIZE_LIST:
    case Type::LIST:
    case Type::LARGE_LIST:
    case Type::LIST_VIEW:
    case Type::LARGE_LIST_VIEW:
    case Type::MAP: {
      auto list_type = std::static_pointer_cast<BaseListType>(data.type());
      if (!ListTypeSupported(*list_type->value_type())) {
        return Status::NotImplemented("Not implemented type for Arrow list to pandas: ",
                                      list_type->value_type()->ToString());
      }
      *output_type = PandasWriter::OBJECT;
    } break;
    case Type::DICTIONARY:
      *output_type = PandasWriter::CATEGORICAL;
      break;
    case Type::EXTENSION:
      *output_type = PandasWriter::EXTENSION;
      break;
    default:
      return Status::NotImplemented(
          "No known equivalent Pandas block for Arrow data of type ",
          data.type()->ToString(), " is known.");
  }
  return Status::OK();
}

// Construct the exact pandas "BlockManager" memory layout
//
// * For each column determine the correct output pandas type
// * Allocate 2D blocks (ncols x nrows) for each distinct data type in output
// * Allocate  block placement arrays
// * Write Arrow columns out into each slice of memory; populate block
// * placement arrays as we go
class PandasBlockCreator {
 public:
  using WriterMap = std::unordered_map<int, std::shared_ptr<PandasWriter>>;

  explicit PandasBlockCreator(const PandasOptions& options, FieldVector fields,
                              ChunkedArrayVector arrays)
      : options_(options), fields_(std::move(fields)), arrays_(std::move(arrays)) {
    num_columns_ = static_cast<int>(arrays_.size());
    if (num_columns_ > 0) {
      num_rows_ = arrays_[0]->length();
    }
    column_block_placement_.resize(num_columns_);
  }
  virtual ~PandasBlockCreator() = default;

  virtual Status Convert(PyObject** out) = 0;

  Status AppendBlocks(const WriterMap& blocks, PyObject* list) {
    for (const auto& it : blocks) {
      PyObject* item;
      RETURN_NOT_OK(it.second->GetDataFrameResult(&item));
      if (PyList_Append(list, item) < 0) {
        RETURN_IF_PYERROR();
      }

      // ARROW-1017; PyList_Append increments object refcount
      Py_DECREF(item);
    }
    return Status::OK();
  }

 protected:
  PandasOptions options_;

  FieldVector fields_;
  ChunkedArrayVector arrays_;
  int num_columns_;
  int64_t num_rows_;

  // column num -> relative placement within internal block
  std::vector<int> column_block_placement_;
};

// Helper function for extension chunked arrays
// Constructing a storage chunked array of an extension chunked array
std::shared_ptr<ChunkedArray> GetStorageChunkedArray(std::shared_ptr<ChunkedArray> arr) {
  auto value_type = checked_cast<const ExtensionType&>(*arr->type()).storage_type();
  ArrayVector storage_arrays;
  for (int c = 0; c < arr->num_chunks(); c++) {
    const auto& arr_ext = checked_cast<const ExtensionArray&>(*arr->chunk(c));
    storage_arrays.emplace_back(arr_ext.storage());
  }
  return std::make_shared<ChunkedArray>(std::move(storage_arrays), value_type);
};

// Helper function to decode RunEndEncodedArray
Result<std::shared_ptr<ChunkedArray>> GetDecodedChunkedArray(
    std::shared_ptr<ChunkedArray> arr) {
  ARROW_ASSIGN_OR_RAISE(Datum decoded, compute::RunEndDecode(arr));
  DCHECK(decoded.is_chunked_array());
  return decoded.chunked_array();
};

class ConsolidatedBlockCreator : public PandasBlockCreator {
 public:
  using PandasBlockCreator::PandasBlockCreator;

  Status Convert(PyObject** out) override {
    column_types_.resize(num_columns_);
    RETURN_NOT_OK(CreateBlocks());
    RETURN_NOT_OK(WriteTableToBlocks());
    PyAcquireGIL lock;

    PyObject* result = PyList_New(0);
    RETURN_IF_PYERROR();

    RETURN_NOT_OK(AppendBlocks(blocks_, result));
    RETURN_NOT_OK(AppendBlocks(singleton_blocks_, result));

    *out = result;
    return Status::OK();
  }

  Status GetBlockType(int column_index, PandasWriter::type* out) {
    if (options_.extension_columns.count(fields_[column_index]->name())) {
      *out = PandasWriter::EXTENSION;
      return Status::OK();
    } else {
      // In case of an extension array default to the storage type
      if (arrays_[column_index]->type()->id() == Type::EXTENSION) {
        arrays_[column_index] = GetStorageChunkedArray(arrays_[column_index]);
      }
      // In case of a RunEndEncodedArray default to the values type
      else if (arrays_[column_index]->type()->id() == Type::RUN_END_ENCODED) {
        ARROW_ASSIGN_OR_RAISE(arrays_[column_index],
                              GetDecodedChunkedArray(arrays_[column_index]));
      }
      return GetPandasWriterType(*arrays_[column_index], options_, out);
    }
  }

  Status CreateBlocks() {
    for (int i = 0; i < num_columns_; ++i) {
      const DataType& type = *arrays_[i]->type();
      PandasWriter::type output_type;
      RETURN_NOT_OK(GetBlockType(i, &output_type));

      int block_placement = 0;
      std::shared_ptr<PandasWriter> writer;
      if (output_type == PandasWriter::CATEGORICAL ||
          output_type == PandasWriter::DATETIME_SECOND_TZ ||
          output_type == PandasWriter::DATETIME_MILLI_TZ ||
          output_type == PandasWriter::DATETIME_MICRO_TZ ||
          output_type == PandasWriter::DATETIME_NANO_TZ ||
          output_type == PandasWriter::EXTENSION) {
        RETURN_NOT_OK(MakeWriter(options_, output_type, type, num_rows_,
                                 /*num_columns=*/1, &writer));
        singleton_blocks_[i] = writer;
      } else {
        auto it = block_sizes_.find(output_type);
        if (it != block_sizes_.end()) {
          block_placement = it->second;
          // Increment count
          ++it->second;
        } else {
          // Add key to map
          block_sizes_[output_type] = 1;
        }
      }
      column_types_[i] = output_type;
      column_block_placement_[i] = block_placement;
    }

    // Create normal non-categorical blocks
    for (const auto& it : this->block_sizes_) {
      PandasWriter::type output_type = static_cast<PandasWriter::type>(it.first);
      std::shared_ptr<PandasWriter> block;
      RETURN_NOT_OK(MakeWriter(this->options_, output_type, /*unused*/ *null(), num_rows_,
                               it.second, &block));
      this->blocks_[output_type] = block;
    }
    return Status::OK();
  }

  Status GetWriter(int i, std::shared_ptr<PandasWriter>* block) {
    PandasWriter::type output_type = this->column_types_[i];
    switch (output_type) {
      case PandasWriter::CATEGORICAL:
      case PandasWriter::DATETIME_SECOND_TZ:
      case PandasWriter::DATETIME_MILLI_TZ:
      case PandasWriter::DATETIME_MICRO_TZ:
      case PandasWriter::DATETIME_NANO_TZ:
      case PandasWriter::EXTENSION: {
        auto it = this->singleton_blocks_.find(i);
        if (it == this->singleton_blocks_.end()) {
          return Status::KeyError("No block allocated");
        }
        *block = it->second;
      } break;
      default:
        auto it = this->blocks_.find(output_type);
        if (it == this->blocks_.end()) {
          return Status::KeyError("No block allocated");
        }
        *block = it->second;
        break;
    }
    return Status::OK();
  }

  Status WriteTableToBlocks() {
    auto WriteColumn = [this](int i) {
      std::shared_ptr<PandasWriter> block;
      RETURN_NOT_OK(this->GetWriter(i, &block));
      // ARROW-3789 Use std::move on the array to permit self-destructing
      return block->Write(std::move(arrays_[i]), i, this->column_block_placement_[i]);
    };

    return OptionalParallelFor(options_.use_threads, num_columns_, WriteColumn);
  }

 private:
  // column num -> block type id
  std::vector<PandasWriter::type> column_types_;

  // block type -> type count
  std::unordered_map<int, int> block_sizes_;
  std::unordered_map<int, const DataType*> block_types_;

  // block type -> block
  WriterMap blocks_;

  WriterMap singleton_blocks_;
};

/// \brief Create blocks for pandas.DataFrame block manager using one block per
/// column strategy. This permits some zero-copy optimizations as well as the
/// ability for the table to "self-destruct" if selected by the user.
class SplitBlockCreator : public PandasBlockCreator {
 public:
  using PandasBlockCreator::PandasBlockCreator;

  Status GetWriter(int i, std::shared_ptr<PandasWriter>* writer) {
    PandasWriter::type output_type = PandasWriter::OBJECT;
    const DataType& type = *arrays_[i]->type();
    if (options_.extension_columns.count(fields_[i]->name())) {
      output_type = PandasWriter::EXTENSION;
    } else {
      // Null count needed to determine output type
      RETURN_NOT_OK(GetPandasWriterType(*arrays_[i], options_, &output_type));
    }
    return MakeWriter(this->options_, output_type, type, num_rows_, 1, writer);
  }

  Status Convert(PyObject** out) override {
    PyAcquireGIL lock;

    PyObject* result = PyList_New(0);
    RETURN_IF_PYERROR();

    for (int i = 0; i < num_columns_; ++i) {
      std::shared_ptr<PandasWriter> writer;
      RETURN_NOT_OK(GetWriter(i, &writer));
      // ARROW-3789 Use std::move on the array to permit self-destructing
      RETURN_NOT_OK(writer->Write(std::move(arrays_[i]), i, /*rel_placement=*/0));

      PyObject* item;
      RETURN_NOT_OK(writer->GetDataFrameResult(&item));
      if (PyList_Append(result, item) < 0) {
        RETURN_IF_PYERROR();
      }
      // PyList_Append increments object refcount
      Py_DECREF(item);
    }

    *out = result;
    return Status::OK();
  }

 private:
  std::vector<std::shared_ptr<PandasWriter>> writers_;
};

Status ConvertCategoricals(const PandasOptions& options, ChunkedArrayVector* arrays,
                           FieldVector* fields) {
  std::vector<int> columns_to_encode;

  // For Categorical conversions
  auto EncodeColumn = [&](int j) {
    int i = columns_to_encode[j];
    if (options.zero_copy_only) {
      return Status::Invalid("Need to dictionary encode a column, but ",
                             "only zero-copy conversions allowed");
    }
    compute::ExecContext ctx(options.pool);
    ARROW_ASSIGN_OR_RAISE(
        Datum out, DictionaryEncode((*arrays)[i],
                                    compute::DictionaryEncodeOptions::Defaults(), &ctx));
    (*arrays)[i] = out.chunked_array();
    (*fields)[i] = (*fields)[i]->WithType((*arrays)[i]->type());
    return Status::OK();
  };

  if (!options.categorical_columns.empty()) {
    for (int i = 0; i < static_cast<int>(arrays->size()); i++) {
      if ((*arrays)[i]->type()->id() != Type::DICTIONARY &&
          options.categorical_columns.count((*fields)[i]->name())) {
        columns_to_encode.push_back(i);
      }
    }
  }
  if (options.strings_to_categorical) {
    for (int i = 0; i < static_cast<int>(arrays->size()); i++) {
      if (is_base_binary_like((*arrays)[i]->type()->id())) {
        columns_to_encode.push_back(i);
      }
    }
  }
  return OptionalParallelFor(options.use_threads,
                             static_cast<int>(columns_to_encode.size()), EncodeColumn);
}

}  // namespace

Status ConvertArrayToPandas(const PandasOptions& options, std::shared_ptr<Array> arr,
                            PyObject* py_ref, PyObject** out) {
  return ConvertChunkedArrayToPandas(
      options, std::make_shared<ChunkedArray>(std::move(arr)), py_ref, out);
}

Status ConvertChunkedArrayToPandas(const PandasOptions& options,
                                   std::shared_ptr<ChunkedArray> arr, PyObject* py_ref,
                                   PyObject** out) {
  if (options.decode_dictionaries && arr->type()->id() == Type::DICTIONARY) {
    // XXX we should return an error as below if options.zero_copy_only
    // is true, but that would break compatibility with existing tests.
    const auto& dense_type =
        checked_cast<const DictionaryType&>(*arr->type()).value_type();
    RETURN_NOT_OK(DecodeDictionaries(options.pool, dense_type, &arr));
    DCHECK_NE(arr->type()->id(), Type::DICTIONARY);

    // The original Python DictionaryArray won't own the memory anymore
    // as we actually built a new array when we decoded the DictionaryArray
    // thus let the final resulting numpy array own the memory through a Capsule
    py_ref = nullptr;
  }

  if (options.strings_to_categorical && is_base_binary_like(arr->type()->id())) {
    if (options.zero_copy_only) {
      return Status::Invalid("Need to dictionary encode a column, but ",
                             "only zero-copy conversions allowed");
    }
    compute::ExecContext ctx(options.pool);
    ARROW_ASSIGN_OR_RAISE(
        Datum out,
        DictionaryEncode(arr, compute::DictionaryEncodeOptions::Defaults(), &ctx));
    arr = out.chunked_array();
  }

  PandasOptions modified_options = options;
  modified_options.strings_to_categorical = false;

  // ARROW-7596: We permit the hybrid Series/DataFrame code path to do zero copy
  // optimizations that we do not allow in the default case when converting
  // Table->DataFrame
  modified_options.allow_zero_copy_blocks = true;

  // In case of an extension array default to the storage type
  if (arr->type()->id() == Type::EXTENSION) {
    arr = GetStorageChunkedArray(arr);
  }
  // In case of a RunEndEncodedArray decode the array
  else if (arr->type()->id() == Type::RUN_END_ENCODED) {
    if (options.zero_copy_only) {
      return Status::Invalid("Need to dencode a RunEndEncodedArray, but ",
                             "only zero-copy conversions allowed");
    }
    ARROW_ASSIGN_OR_RAISE(arr, GetDecodedChunkedArray(arr));

    // Because we built a new array when we decoded the RunEndEncodedArray
    // the final resulting numpy array should own the memory through a Capsule
    py_ref = nullptr;
  }

  PandasWriter::type output_type;
  RETURN_NOT_OK(GetPandasWriterType(*arr, modified_options, &output_type));
  if (options.decode_dictionaries) {
    DCHECK_NE(output_type, PandasWriter::CATEGORICAL);
  }

  std::shared_ptr<PandasWriter> writer;
  RETURN_NOT_OK(MakeWriter(modified_options, output_type, *arr->type(), arr->length(),
                           /*num_columns=*/1, &writer));
  RETURN_NOT_OK(writer->TransferSingle(std::move(arr), py_ref));
  return writer->GetSeriesResult(out);
}

Status ConvertTableToPandas(const PandasOptions& options, std::shared_ptr<Table> table,
                            PyObject** out) {
  ChunkedArrayVector arrays = table->columns();
  FieldVector fields = table->fields();

  // ARROW-3789: allow "self-destructing" by releasing references to columns as
  // we convert them to pandas
  table = nullptr;

  RETURN_NOT_OK(ConvertCategoricals(options, &arrays, &fields));

  PandasOptions modified_options = options;
  modified_options.strings_to_categorical = false;
  modified_options.categorical_columns.clear();

  if (options.split_blocks) {
    modified_options.allow_zero_copy_blocks = true;
    SplitBlockCreator helper(modified_options, std::move(fields), std::move(arrays));
    return helper.Convert(out);
  } else {
    ConsolidatedBlockCreator helper(modified_options, std::move(fields),
                                    std::move(arrays));
    return helper.Convert(out);
  }
}

}  // namespace py
}  // namespace arrow