Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 20.0.0.dev104 

/ _parquet.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++

from collections.abc import Sequence
from textwrap import indent
import warnings

from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
                          check_status,
                          MemoryPool, maybe_unbox_memory_pool,
                          Table, KeyValueMetadata,
                          pyarrow_wrap_chunked_array,
                          pyarrow_wrap_schema,
                          pyarrow_unwrap_metadata,
                          pyarrow_unwrap_schema,
                          pyarrow_wrap_table,
                          pyarrow_wrap_batch,
                          pyarrow_wrap_scalar,
                          NativeFile, get_reader, get_writer,
                          string_to_timeunit)

from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
                         _stringify_path,
                         tobytes, frombytes, is_threading_enabled)

cimport cpython as cp

_DEFAULT_ROW_GROUP_SIZE = 1024*1024
_MAX_ROW_GROUP_SIZE = 64*1024*1024

cdef class Statistics(_Weakrefable):
    """Statistics for a single column in a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        return """{}
  has_min_max: {}
  min: {}
  max: {}
  null_count: {}
  distinct_count: {}
  num_values: {}
  physical_type: {}
  logical_type: {}
  converted_type (legacy): {}""".format(object.__repr__(self),
                                        self.has_min_max,
                                        self.min,
                                        self.max,
                                        self.null_count,
                                        self.distinct_count,
                                        self.num_values,
                                        self.physical_type,
                                        str(self.logical_type),
                                        self.converted_type)

    def to_dict(self):
        """
        Get dictionary representation of statistics.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        d = dict(
            has_min_max=self.has_min_max,
            min=self.min,
            max=self.max,
            null_count=self.null_count,
            distinct_count=self.distinct_count,
            num_values=self.num_values,
            physical_type=self.physical_type
        )
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, Statistics other):
        """
        Return whether the two column statistics objects are equal.

        Parameters
        ----------
        other : Statistics
            Statistics to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.statistics.get().Equals(deref(other.statistics.get()))

    @property
    def has_min_max(self):
        """Whether min and max are present (bool)."""
        return self.statistics.get().HasMinMax()

    @property
    def has_null_count(self):
        """Whether null count is present (bool)."""
        return self.statistics.get().HasNullCount()

    @property
    def has_distinct_count(self):
        """Whether distinct count is preset (bool)."""
        return self.statistics.get().HasDistinctCount()

    @property
    def min_raw(self):
        """Min value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_min(self.statistics.get())
        else:
            return None

    @property
    def max_raw(self):
        """Max value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_max(self.statistics.get())
        else:
            return None

    @property
    def min(self):
        """
        Min value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            min_scalar, _ = _cast_statistics(self.statistics.get())
            return min_scalar.as_py()
        else:
            return None

    @property
    def max(self):
        """
        Max value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            _, max_scalar = _cast_statistics(self.statistics.get())
            return max_scalar.as_py()
        else:
            return None

    @property
    def null_count(self):
        """Number of null values in chunk (int)."""
        if self.has_null_count:
            return self.statistics.get().null_count()
        else:
            return None

    @property
    def distinct_count(self):
        """Distinct number of values in chunk (int)."""
        if self.has_distinct_count:
            return self.statistics.get().distinct_count()
        else:
            return None

    @property
    def num_values(self):
        """Number of non-null values (int)."""
        return self.statistics.get().num_values()

    @property
    def physical_type(self):
        """Physical type of column (str)."""
        raw_physical_type = self.statistics.get().physical_type()
        return physical_type_name_from_enum(raw_physical_type)

    @property
    def logical_type(self):
        """Logical type of column (:class:`ParquetLogicalType`)."""
        return wrap_logical_type(self.statistics.get().descr().logical_type())

    @property
    def converted_type(self):
        """Legacy converted type (str or None)."""
        raw_converted_type = self.statistics.get().descr().converted_type()
        return converted_type_name_from_enum(raw_converted_type)


cdef class ParquetLogicalType(_Weakrefable):
    """Logical type of parquet type."""
    cdef:
        shared_ptr[const CParquetLogicalType] type

    def __cinit__(self):
        pass

    cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
        self.type = type

    def __repr__(self):
        return "{}\n  {}".format(object.__repr__(self), str(self))

    def __str__(self):
        return frombytes(self.type.get().ToString(), safe=True)

    def to_json(self):
        """
        Get a JSON string containing type and type parameters.

        Returns
        -------
        json : str
            JSON representation of type, with at least a field called 'Type'
            which contains the type name. If the type is parameterized, such
            as a decimal with scale and precision, will contain those as fields
            as well.
        """
        return frombytes(self.type.get().ToJSON())

    @property
    def type(self):
        """Name of the logical type (str)."""
        return logical_type_name_from_enum(self.type.get().type())


cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
    cdef ParquetLogicalType out = ParquetLogicalType()
    out.init(type)
    return out


cdef _cast_statistic_raw_min(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).min()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).min()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).min()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).min()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).min()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).min())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)


cdef _cast_statistic_raw_max(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).max()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).max()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).max()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).max()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).max()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).max())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)


cdef _cast_statistics(CStatistics* statistics):
    cdef:
        shared_ptr[CScalar] c_min
        shared_ptr[CScalar] c_max
    check_status(StatisticsAsScalars(statistics[0], &c_min, &c_max))
    return (pyarrow_wrap_scalar(c_min), pyarrow_wrap_scalar(c_max))


cdef _box_byte_array(ParquetByteArray val):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)


cdef _box_flba(ParquetFLBA val, uint32_t len):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)


cdef class ColumnChunkMetaData(_Weakrefable):
    """Column metadata for a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        statistics = indent(repr(self.statistics), 4 * ' ')
        return """{0}
  file_offset: {1}
  file_path: {2}
  physical_type: {3}
  num_values: {4}
  path_in_schema: {5}
  is_stats_set: {6}
  statistics:
{7}
  compression: {8}
  encodings: {9}
  has_dictionary_page: {10}
  dictionary_page_offset: {11}
  data_page_offset: {12}
  total_compressed_size: {13}
  total_uncompressed_size: {14}""".format(object.__repr__(self),
                                          self.file_offset,
                                          self.file_path,
                                          self.physical_type,
                                          self.num_values,
                                          self.path_in_schema,
                                          self.is_stats_set,
                                          statistics,
                                          self.compression,
                                          self.encodings,
                                          self.has_dictionary_page,
                                          self.dictionary_page_offset,
                                          self.data_page_offset,
                                          self.total_compressed_size,
                                          self.total_uncompressed_size)

    def to_dict(self):
        """
        Get dictionary representation of the column chunk metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        statistics = self.statistics.to_dict() if self.is_stats_set else None
        d = dict(
            file_offset=self.file_offset,
            file_path=self.file_path,
            physical_type=self.physical_type,
            num_values=self.num_values,
            path_in_schema=self.path_in_schema,
            is_stats_set=self.is_stats_set,
            statistics=statistics,
            compression=self.compression,
            encodings=self.encodings,
            has_dictionary_page=self.has_dictionary_page,
            dictionary_page_offset=self.dictionary_page_offset,
            data_page_offset=self.data_page_offset,
            total_compressed_size=self.total_compressed_size,
            total_uncompressed_size=self.total_uncompressed_size
        )
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, ColumnChunkMetaData other):
        """
        Return whether the two column chunk metadata objects are equal.

        Parameters
        ----------
        other : ColumnChunkMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.metadata.Equals(deref(other.metadata))

    @property
    def file_offset(self):
        """Offset into file where column chunk is located (int)."""
        return self.metadata.file_offset()

    @property
    def file_path(self):
        """Optional file path if set (str or None)."""
        return frombytes(self.metadata.file_path())

    @property
    def physical_type(self):
        """Physical type of column (str)."""
        return physical_type_name_from_enum(self.metadata.type())

    @property
    def num_values(self):
        """Total number of values (int)."""
        return self.metadata.num_values()

    @property
    def path_in_schema(self):
        """Nested path to field, separated by periods (str)."""
        path = self.metadata.path_in_schema().get().ToDotString()
        return frombytes(path)

    @property
    def is_stats_set(self):
        """Whether or not statistics are present in metadata (bool)."""
        return self.metadata.is_stats_set()

    @property
    def statistics(self):
        """Statistics for column chunk (:class:`Statistics`)."""
        if not self.metadata.is_stats_set():
            return None
        statistics = Statistics()
        statistics.init(self.metadata.statistics(), self)
        return statistics

    @property
    def compression(self):
        """
        Type of compression used for column (str).

        One of 'UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD',
        or 'UNKNOWN'.
        """
        return compression_name_from_enum(self.metadata.compression())

    @property
    def encodings(self):
        """
        Encodings used for column (tuple of str).

        One of 'PLAIN', 'BIT_PACKED', 'RLE', 'BYTE_STREAM_SPLIT', 'DELTA_BINARY_PACKED',
        'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY'.
        """
        return tuple(map(encoding_name_from_enum, self.metadata.encodings()))

    @property
    def has_dictionary_page(self):
        """Whether there is dictionary data present in the column chunk (bool)."""
        return bool(self.metadata.has_dictionary_page())

    @property
    def dictionary_page_offset(self):
        """Offset of dictionary page relative to beginning of the file (int)."""
        if self.has_dictionary_page:
            return self.metadata.dictionary_page_offset()
        else:
            return None

    @property
    def data_page_offset(self):
        """Offset of data page relative to beginning of the file (int)."""
        return self.metadata.data_page_offset()

    @property
    def has_index_page(self):
        """Not yet supported."""
        raise NotImplementedError('not supported in parquet-cpp')

    @property
    def index_page_offset(self):
        """Not yet supported."""
        raise NotImplementedError("parquet-cpp doesn't return valid values")

    @property
    def total_compressed_size(self):
        """Compressed size in bytes (int)."""
        return self.metadata.total_compressed_size()

    @property
    def total_uncompressed_size(self):
        """Uncompressed size in bytes (int)."""
        return self.metadata.total_uncompressed_size()

    @property
    def has_offset_index(self):
        """Whether the column chunk has an offset index"""
        return self.metadata.GetOffsetIndexLocation().has_value()

    @property
    def has_column_index(self):
        """Whether the column chunk has a column index"""
        return self.metadata.GetColumnIndexLocation().has_value()

    @property
    def metadata(self):
        """Additional metadata as key value pairs (dict[bytes, bytes])."""
        cdef:
            unordered_map[c_string, c_string] metadata
            const CKeyValueMetadata* underlying_metadata
        underlying_metadata = self.metadata.key_value_metadata().get()
        if underlying_metadata != NULL:
            underlying_metadata.ToUnorderedMap(&metadata)
            return metadata
        else:
            return None


cdef class SortingColumn:
    """
    Sorting specification for a single column.

    Returned by :meth:`RowGroupMetaData.sorting_columns` and used in
    :class:`ParquetWriter` to specify the sort order of the data.

    Parameters
    ----------
    column_index : int
        Index of column that data is sorted by.
    descending : bool, default False
        Whether column is sorted in descending order.
    nulls_first : bool, default False
        Whether null values appear before valid values.

    Notes
    -----

    Column indices are zero-based, refer only to leaf fields, and are in
    depth-first order. This may make the column indices for nested schemas
    different from what you expect. In most cases, it will be easier to
    specify the sort order using column names instead of column indices
    and converting using the ``from_ordering`` method.

    Examples
    --------

    In other APIs, sort order is specified by names, such as:

    >>> sort_order = [('id', 'ascending'), ('timestamp', 'descending')]

    For Parquet, the column index must be used instead:

    >>> import pyarrow.parquet as pq
    >>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)]
    [SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)]

    Convert the sort_order into the list of sorting columns with
    ``from_ordering`` (note that the schema must be provided as well):

    >>> import pyarrow as pa
    >>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))])
    >>> sorting_columns = pq.SortingColumn.from_ordering(schema, sort_order)
    >>> sorting_columns
    (SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False))

    Convert back to the sort order with ``to_ordering``:

    >>> pq.SortingColumn.to_ordering(schema, sorting_columns)
    ((('id', 'ascending'), ('timestamp', 'descending')), 'at_end')

    See Also
    --------
    RowGroupMetaData.sorting_columns
    """
    cdef int column_index
    cdef c_bool descending
    cdef c_bool nulls_first

    def __init__(self, int column_index, c_bool descending=False, c_bool nulls_first=False):
        self.column_index = column_index
        self.descending = descending
        self.nulls_first = nulls_first

    @classmethod
    def from_ordering(cls, Schema schema, sort_keys, null_placement='at_end'):
        """
        Create a tuple of SortingColumn objects from the same arguments as
        :class:`pyarrow.compute.SortOptions`.

        Parameters
        ----------
        schema : Schema
            Schema of the input data.
        sort_keys : Sequence of (name, order) tuples
            Names of field/column keys (str) to sort the input on,
            along with the order each field/column is sorted in.
            Accepted values for `order` are "ascending", "descending".
        null_placement : {'at_start', 'at_end'}, default 'at_end'
            Where null values should appear in the sort order.

        Returns
        -------
        sorting_columns : tuple of SortingColumn
        """
        if null_placement == 'at_start':
            nulls_first = True
        elif null_placement == 'at_end':
            nulls_first = False
        else:
            raise ValueError('null_placement must be "at_start" or "at_end"')

        col_map = _name_to_index_map(schema)

        sorting_columns = []

        for sort_key in sort_keys:
            if isinstance(sort_key, str):
                name = sort_key
                descending = False
            elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and
                    isinstance(sort_key[0], str) and
                    isinstance(sort_key[1], str)):
                name, descending = sort_key
                if descending == "descending":
                    descending = True
                elif descending == "ascending":
                    descending = False
                else:
                    raise ValueError("Invalid sort key direction: {0}"
                                     .format(descending))
            else:
                raise ValueError("Invalid sort key: {0}".format(sort_key))

            try:
                column_index = col_map[name]
            except KeyError:
                raise ValueError("Sort key name '{0}' not found in schema:\n{1}"
                                 .format(name, schema))

            sorting_columns.append(
                cls(column_index, descending=descending, nulls_first=nulls_first)
            )

        return tuple(sorting_columns)

    @staticmethod
    def to_ordering(Schema schema, sorting_columns):
        """
        Convert a tuple of SortingColumn objects to the same format as
        :class:`pyarrow.compute.SortOptions`.

        Parameters
        ----------
        schema : Schema
            Schema of the input data.
        sorting_columns : tuple of SortingColumn
            Columns to sort the input on.

        Returns
        -------
        sort_keys : tuple of (name, order) tuples
        null_placement : {'at_start', 'at_end'}
        """
        col_map = {i: name for name, i in _name_to_index_map(schema).items()}

        sort_keys = []
        nulls_first = None

        for sorting_column in sorting_columns:
            name = col_map[sorting_column.column_index]
            if sorting_column.descending:
                order = "descending"
            else:
                order = "ascending"
            sort_keys.append((name, order))
            if nulls_first is None:
                nulls_first = sorting_column.nulls_first
            elif nulls_first != sorting_column.nulls_first:
                raise ValueError("Sorting columns have inconsistent null placement")

        if nulls_first:
            null_placement = "at_start"
        else:
            null_placement = "at_end"

        return tuple(sort_keys), null_placement

    def __repr__(self):
        return """{}(column_index={}, descending={}, nulls_first={})""".format(
            self.__class__.__name__,
            self.column_index, self.descending, self.nulls_first)

    def __eq__(self, SortingColumn other):
        return (self.column_index == other.column_index and
                self.descending == other.descending and
                self.nulls_first == other.nulls_first)

    def __hash__(self):
        return hash((self.column_index, self.descending, self.nulls_first))

    @property
    def column_index(self):
        """"Index of column data is sorted by (int)."""
        return self.column_index

    @property
    def descending(self):
        """Whether column is sorted in descending order (bool)."""
        return self.descending

    @property
    def nulls_first(self):
        """Whether null values appear before valid values (bool)."""
        return self.nulls_first

    def to_dict(self):
        """
        Get dictionary representation of the SortingColumn.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        d = dict(
            column_index=self.column_index,
            descending=self.descending,
            nulls_first=self.nulls_first
        )
        return d


cdef class RowGroupMetaData(_Weakrefable):
    """Metadata for a single row group."""

    def __cinit__(self, FileMetaData parent, int index):
        if index < 0 or index >= parent.num_row_groups:
            raise IndexError('{0} out of bounds'.format(index))
        self.up_metadata = parent._metadata.RowGroup(index)
        self.metadata = self.up_metadata.get()
        self.parent = parent
        self.index = index

    def __reduce__(self):
        return RowGroupMetaData, (self.parent, self.index)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, RowGroupMetaData other):
        """
        Return whether the two row group metadata objects are equal.

        Parameters
        ----------
        other : RowGroupMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.metadata.Equals(deref(other.metadata))

    def column(self, int i):
        """
        Get column metadata at given index.

        Parameters
        ----------
        i : int
            Index of column to get metadata for.

        Returns
        -------
        ColumnChunkMetaData
            Metadata for column within this chunk.
        """
        if i < 0 or i >= self.num_columns:
            raise IndexError('{0} out of bounds'.format(i))
        chunk = ColumnChunkMetaData()
        chunk.init(self, i)
        return chunk

    def __repr__(self):
        return """{0}
  num_columns: {1}
  num_rows: {2}
  total_byte_size: {3}
  sorting_columns: {4}""".format(object.__repr__(self),
                                 self.num_columns,
                                 self.num_rows,
                                 self.total_byte_size,
                                 self.sorting_columns)

    def to_dict(self):
        """
        Get dictionary representation of the row group metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        columns = []
        d = dict(
            num_columns=self.num_columns,
            num_rows=self.num_rows,
            total_byte_size=self.total_byte_size,
            columns=columns,
            sorting_columns=[col.to_dict() for col in self.sorting_columns]
        )
        for i in range(self.num_columns):
            columns.append(self.column(i).to_dict())
        return d

    @property
    def num_columns(self):
        """Number of columns in this row group (int)."""
        return self.metadata.num_columns()

    @property
    def num_rows(self):
        """Number of rows in this row group (int)."""
        return self.metadata.num_rows()

    @property
    def total_byte_size(self):
        """Total byte size of all the uncompressed column data in this row group (int)."""
        return self.metadata.total_byte_size()

    @property
    def sorting_columns(self):
        """Columns the row group is sorted by (tuple of :class:`SortingColumn`))."""
        out = []
        cdef vector[CSortingColumn] sorting_columns = self.metadata.sorting_columns()
        for sorting_col in sorting_columns:
            out.append(SortingColumn(
                sorting_col.column_idx,
                sorting_col.descending,
                sorting_col.nulls_first
            ))
        return tuple(out)


def _reconstruct_filemetadata(Buffer serialized):
    cdef:
        FileMetaData metadata = FileMetaData.__new__(FileMetaData)
        CBuffer *buffer = serialized.buffer.get()
        uint32_t metadata_len = <uint32_t>buffer.size()

    metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))

    return metadata


cdef class FileMetaData(_Weakrefable):
    """Parquet metadata for a single file."""

    def __cinit__(self):
        pass

    def __reduce__(self):
        cdef:
            NativeFile sink = BufferOutputStream()
            COutputStream* c_sink = sink.get_output_stream().get()
        with nogil:
            self._metadata.WriteTo(c_sink)

        cdef Buffer buffer = sink.getvalue()
        return _reconstruct_filemetadata, (buffer,)

    def __hash__(self):
        return hash((self.schema,
                     self.num_rows,
                     self.num_row_groups,
                     self.format_version,
                     self.serialized_size))

    def __repr__(self):
        return """{0}
  created_by: {1}
  num_columns: {2}
  num_rows: {3}
  num_row_groups: {4}
  format_version: {5}
  serialized_size: {6}""".format(object.__repr__(self),
                                 self.created_by, self.num_columns,
                                 self.num_rows, self.num_row_groups,
                                 self.format_version,
                                 self.serialized_size)

    def to_dict(self):
        """
        Get dictionary representation of the file metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        row_groups = []
        d = dict(
            created_by=self.created_by,
            num_columns=self.num_columns,
            num_rows=self.num_rows,
            num_row_groups=self.num_row_groups,
            row_groups=row_groups,
            format_version=self.format_version,
            serialized_size=self.serialized_size
        )
        for i in range(self.num_row_groups):
            row_groups.append(self.row_group(i).to_dict())
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, FileMetaData other not None):
        """
        Return whether the two file metadata objects are equal.

        Parameters
        ----------
        other : FileMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self._metadata.Equals(deref(other._metadata))

    @property
    def schema(self):
        """Schema of the file (:class:`ParquetSchema`)."""
        if self._schema is None:
            self._schema = ParquetSchema(self)
        return self._schema

    @property
    def serialized_size(self):
        """Size of the original thrift encoded metadata footer (int)."""
        return self._metadata.size()

    @property
    def num_columns(self):
        """Number of columns in file (int)."""
        return self._metadata.num_columns()

    @property
    def num_rows(self):
        """Total number of rows in file (int)."""
        return self._metadata.num_rows()

    @property
    def num_row_groups(self):
        """Number of row groups in file (int)."""
        return self._metadata.num_row_groups()

    @property
    def format_version(self):
        """
        Parquet format version used in file (str, such as '1.0', '2.4').

        If version is missing or unparsable, will default to assuming '2.6'.
        """
        cdef ParquetVersion version = self._metadata.version()
        if version == ParquetVersion_V1:
            return '1.0'
        elif version == ParquetVersion_V2_0:
            return 'pseudo-2.0'
        elif version == ParquetVersion_V2_4:
            return '2.4'
        elif version == ParquetVersion_V2_6:
            return '2.6'
        else:
            warnings.warn('Unrecognized file version, assuming 2.6: {}'
                          .format(version))
            return '2.6'

    @property
    def created_by(self):
        """
        String describing source of the parquet file (str).

        This typically includes library name and version number. For example, Arrow 7.0's
        writer returns 'parquet-cpp-arrow version 7.0.0'.
        """
        return frombytes(self._metadata.created_by())

    @property
    def metadata(self):
        """Additional metadata as key value pairs (dict[bytes, bytes])."""
        cdef:
            unordered_map[c_string, c_string] metadata
            const CKeyValueMetadata* underlying_metadata
        underlying_metadata = self._metadata.key_value_metadata().get()
        if underlying_metadata != NULL:
            underlying_metadata.ToUnorderedMap(&metadata)
            return metadata
        else:
            return None

    def row_group(self, int i):
        """
        Get metadata for row group at index i.

        Parameters
        ----------
        i : int
            Row group index to get.

        Returns
        -------
        row_group_metadata : RowGroupMetaData
        """
        return RowGroupMetaData(self, i)

    def set_file_path(self, path):
        """
        Set ColumnChunk file paths to the given value.

        This method modifies the ``file_path`` field of each ColumnChunk
        in the FileMetaData to be a particular value.

        Parameters
        ----------
        path : str
            The file path to set on all ColumnChunks.
        """
        cdef:
            c_string c_path = tobytes(path)
        self._metadata.set_file_path(c_path)

    def append_row_groups(self, FileMetaData other):
        """
        Append row groups from other FileMetaData object.

        Parameters
        ----------
        other : FileMetaData
            Other metadata to append row groups from.
        """
        cdef shared_ptr[CFileMetaData] c_metadata

        c_metadata = other.sp_metadata
        self._metadata.AppendRowGroups(deref(c_metadata))

    def write_metadata_file(self, where):
        """
        Write the metadata to a metadata-only Parquet file.

        Parameters
        ----------
        where : path or file-like object
            Where to write the metadata.  Should be a writable path on
            the local filesystem, or a writable file-like object.
        """
        cdef:
            shared_ptr[COutputStream] sink
            c_string c_where

        try:
            where = _stringify_path(where)
        except TypeError:
            get_writer(where, &sink)
        else:
            c_where = tobytes(where)
            with nogil:
                sink = GetResultValue(FileOutputStream.Open(c_where))

        with nogil:
            check_status(
                WriteMetaDataFile(deref(self._metadata), sink.get()))


cdef class ParquetSchema(_Weakrefable):
    """A Parquet schema."""

    def __cinit__(self, FileMetaData container):
        self.parent = container
        self.schema = container._metadata.schema()

    def __repr__(self):
        return "{0}\n{1}".format(
            object.__repr__(self),
            frombytes(self.schema.ToString(), safe=True))

    def __reduce__(self):
        return ParquetSchema, (self.parent,)

    def __len__(self):
        return self.schema.num_columns()

    def __getitem__(self, i):
        return self.column(i)

    def __hash__(self):
        return hash(self.schema.ToString())

    @property
    def names(self):
        """Name of each field (list of str)."""
        return [self[i].name for i in range(len(self))]

    def to_arrow_schema(self):
        """
        Convert Parquet schema to effective Arrow schema.

        Returns
        -------
        schema : Schema
        """
        cdef shared_ptr[CSchema] sp_arrow_schema

        with nogil:
            check_status(FromParquetSchema(
                self.schema, default_arrow_reader_properties(),
                self.parent._metadata.key_value_metadata(),
                &sp_arrow_schema))

        return pyarrow_wrap_schema(sp_arrow_schema)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, ParquetSchema other):
        """
        Return whether the two schemas are equal.

        Parameters
        ----------
        other : ParquetSchema
            Schema to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.schema.Equals(deref(other.schema))

    def column(self, i):
        """
        Return the schema for a single column.

        Parameters
        ----------
        i : int
            Index of column in schema.

        Returns
        -------
        column_schema : ColumnSchema
        """
        if i < 0 or i >= len(self):
            raise IndexError('{0} out of bounds'.format(i))

        return ColumnSchema(self, i)


cdef class ColumnSchema(_Weakrefable):
    """Schema for a single column."""
    cdef:
        int index
        ParquetSchema parent
        const ColumnDescriptor* descr

    def __cinit__(self, ParquetSchema schema, int index):
        self.parent = schema
        self.index = index  # for pickling support
        self.descr = schema.schema.Column(index)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def __reduce__(self):
        return ColumnSchema, (self.parent, self.index)

    def equals(self, ColumnSchema other):
        """
        Return whether the two column schemas are equal.

        Parameters
        ----------
        other : ColumnSchema
            Schema to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.descr.Equals(deref(other.descr))

    def __repr__(self):
        physical_type = self.physical_type
        converted_type = self.converted_type
        if converted_type == 'DECIMAL':
            converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
                                                        self.scale)
        elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
            converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
                              .format(self.length))

        return """<ParquetColumnSchema>
  name: {0}
  path: {1}
  max_definition_level: {2}
  max_repetition_level: {3}
  physical_type: {4}
  logical_type: {5}
  converted_type (legacy): {6}""".format(self.name, self.path,
                                         self.max_definition_level,
                                         self.max_repetition_level,
                                         physical_type,
                                         str(self.logical_type),
                                         converted_type)

    @property
    def name(self):
        """Name of field (str)."""
        return frombytes(self.descr.name())

    @property
    def path(self):
        """Nested path to field, separated by periods (str)."""
        return frombytes(self.descr.path().get().ToDotString())

    @property
    def max_definition_level(self):
        """Maximum definition level (int)."""
        return self.descr.max_definition_level()

    @property
    def max_repetition_level(self):
        """Maximum repetition level (int)."""
        return self.descr.max_repetition_level()

    @property
    def physical_type(self):
        """Name of physical type (str)."""
        return physical_type_name_from_enum(self.descr.physical_type())

    @property
    def logical_type(self):
        """Logical type of column (:class:`ParquetLogicalType`)."""
        return wrap_logical_type(self.descr.logical_type())

    @property
    def converted_type(self):
        """Legacy converted type (str or None)."""
        return converted_type_name_from_enum(self.descr.converted_type())

    # FIXED_LEN_BYTE_ARRAY attribute
    @property
    def length(self):
        """Array length if fixed length byte array type, None otherwise (int or None)."""
        return self.descr.type_length()

    # Decimal attributes
    @property
    def precision(self):
        """Precision if decimal type, None otherwise (int or None)."""
        return self.descr.type_precision()

    @property
    def scale(self):
        """Scale if decimal type, None otherwise (int or None)."""
        return self.descr.type_scale()


cdef physical_type_name_from_enum(ParquetType type_):
    return {
        ParquetType_BOOLEAN: 'BOOLEAN',
        ParquetType_INT32: 'INT32',
        ParquetType_INT64: 'INT64',
        ParquetType_INT96: 'INT96',
        ParquetType_FLOAT: 'FLOAT',
        ParquetType_DOUBLE: 'DOUBLE',
        ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
        ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
    }.get(type_, 'UNKNOWN')


cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
    return {
        ParquetLogicalType_UNDEFINED: 'UNDEFINED',
        ParquetLogicalType_STRING: 'STRING',
        ParquetLogicalType_MAP: 'MAP',
        ParquetLogicalType_LIST: 'LIST',
        ParquetLogicalType_ENUM: 'ENUM',
        ParquetLogicalType_DECIMAL: 'DECIMAL',
        ParquetLogicalType_DATE: 'DATE',
        ParquetLogicalType_TIME: 'TIME',
        ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
        ParquetLogicalType_INT: 'INT',
        ParquetLogicalType_FLOAT16: 'FLOAT16',
        ParquetLogicalType_JSON: 'JSON',
        ParquetLogicalType_BSON: 'BSON',
        ParquetLogicalType_UUID: 'UUID',
        ParquetLogicalType_NONE: 'NONE',
    }.get(type_, 'UNKNOWN')


cdef converted_type_name_from_enum(ParquetConvertedType type_):
    return {
        ParquetConvertedType_NONE: 'NONE',
        ParquetConvertedType_UTF8: 'UTF8',
        ParquetConvertedType_MAP: 'MAP',
        ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
        ParquetConvertedType_LIST: 'LIST',
        ParquetConvertedType_ENUM: 'ENUM',
        ParquetConvertedType_DECIMAL: 'DECIMAL',
        ParquetConvertedType_DATE: 'DATE',
        ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
        ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
        ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
        ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
        ParquetConvertedType_UINT_8: 'UINT_8',
        ParquetConvertedType_UINT_16: 'UINT_16',
        ParquetConvertedType_UINT_32: 'UINT_32',
        ParquetConvertedType_UINT_64: 'UINT_64',
        ParquetConvertedType_INT_8: 'INT_8',
        ParquetConvertedType_INT_16: 'INT_16',
        ParquetConvertedType_INT_32: 'INT_32',
        ParquetConvertedType_INT_64: 'INT_64',
        ParquetConvertedType_JSON: 'JSON',
        ParquetConvertedType_BSON: 'BSON',
        ParquetConvertedType_INTERVAL: 'INTERVAL',
    }.get(type_, 'UNKNOWN')


cdef encoding_name_from_enum(ParquetEncoding encoding_):
    return {
        ParquetEncoding_PLAIN: 'PLAIN',
        ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
        ParquetEncoding_RLE: 'RLE',
        ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
        ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
        ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
        ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
        ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
        ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
    }.get(encoding_, 'UNKNOWN')


cdef encoding_enum_from_name(str encoding_name):
    enc = {
        'PLAIN': ParquetEncoding_PLAIN,
        'BIT_PACKED': ParquetEncoding_BIT_PACKED,
        'RLE': ParquetEncoding_RLE,
        'BYTE_STREAM_SPLIT': ParquetEncoding_BYTE_STREAM_SPLIT,
        'DELTA_BINARY_PACKED': ParquetEncoding_DELTA_BINARY_PACKED,
        'DELTA_LENGTH_BYTE_ARRAY': ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY,
        'DELTA_BYTE_ARRAY': ParquetEncoding_DELTA_BYTE_ARRAY,
        'RLE_DICTIONARY': 'dict',
        'PLAIN_DICTIONARY': 'dict',
    }.get(encoding_name, None)
    if enc is None:
        raise ValueError(f"Unsupported column encoding: {encoding_name!r}")
    elif enc == 'dict':
        raise ValueError(f"{encoding_name!r} is already used by default.")
    else:
        return enc


cdef compression_name_from_enum(ParquetCompression compression_):
    return {
        ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
        ParquetCompression_SNAPPY: 'SNAPPY',
        ParquetCompression_GZIP: 'GZIP',
        ParquetCompression_LZO: 'LZO',
        ParquetCompression_BROTLI: 'BROTLI',
        ParquetCompression_LZ4: 'LZ4',
        ParquetCompression_ZSTD: 'ZSTD',
    }.get(compression_, 'UNKNOWN')


cdef int check_compression_name(name) except -1:
    if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
                            'ZSTD'}:
        raise ArrowException("Unsupported compression: " + name)
    return 0


cdef ParquetCompression compression_from_name(name):
    name = name.upper()
    if name == 'SNAPPY':
        return ParquetCompression_SNAPPY
    elif name == 'GZIP':
        return ParquetCompression_GZIP
    elif name == 'LZO':
        return ParquetCompression_LZO
    elif name == 'BROTLI':
        return ParquetCompression_BROTLI
    elif name == 'LZ4':
        return ParquetCompression_LZ4
    elif name == 'ZSTD':
        return ParquetCompression_ZSTD
    else:
        return ParquetCompression_UNCOMPRESSED


cdef class ParquetReader(_Weakrefable):
    cdef:
        object source
        CMemoryPool* pool
        UniquePtrNoGIL[FileReader] reader
        FileMetaData _metadata
        shared_ptr[CRandomAccessFile] rd_handle

    cdef public:
        _column_idx_map

    def __cinit__(self, MemoryPool memory_pool=None):
        self.pool = maybe_unbox_memory_pool(memory_pool)
        self._metadata = None

    def open(self, object source not None, *, bint use_memory_map=False,
             read_dictionary=None, FileMetaData metadata=None,
             int buffer_size=0, bint pre_buffer=False,
             coerce_int96_timestamp_unit=None,
             FileDecryptionProperties decryption_properties=None,
             thrift_string_size_limit=None,
             thrift_container_size_limit=None,
             page_checksum_verification=False):
        """
        Open a parquet file for reading.

        Parameters
        ----------
        source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
        use_memory_map : bool, default False
        read_dictionary : iterable[int or str], optional
        metadata : FileMetaData, optional
        buffer_size : int, default 0
        pre_buffer : bool, default False
        coerce_int96_timestamp_unit : str, optional
        decryption_properties : FileDecryptionProperties, optional
        thrift_string_size_limit : int, optional
        thrift_container_size_limit : int, optional
        page_checksum_verification : bool, default False
        """
        cdef:
            shared_ptr[CFileMetaData] c_metadata
            CReaderProperties properties = default_reader_properties()
            ArrowReaderProperties arrow_props = (
                default_arrow_reader_properties())
            FileReaderBuilder builder

        if pre_buffer and not is_threading_enabled():
            pre_buffer = False

        if metadata is not None:
            c_metadata = metadata.sp_metadata

        if buffer_size > 0:
            properties.enable_buffered_stream()
            properties.set_buffer_size(buffer_size)
        elif buffer_size == 0:
            properties.disable_buffered_stream()
        else:
            raise ValueError('Buffer size must be larger than zero')

        if thrift_string_size_limit is not None:
            if thrift_string_size_limit <= 0:
                raise ValueError("thrift_string_size_limit "
                                 "must be larger than zero")
            properties.set_thrift_string_size_limit(thrift_string_size_limit)
        if thrift_container_size_limit is not None:
            if thrift_container_size_limit <= 0:
                raise ValueError("thrift_container_size_limit "
                                 "must be larger than zero")
            properties.set_thrift_container_size_limit(
                thrift_container_size_limit)

        if decryption_properties is not None:
            properties.file_decryption_properties(
                decryption_properties.unwrap())

        arrow_props.set_pre_buffer(pre_buffer)

        properties.set_page_checksum_verification(page_checksum_verification)

        if coerce_int96_timestamp_unit is None:
            # use the default defined in default_arrow_reader_properties()
            pass
        else:
            arrow_props.set_coerce_int96_timestamp_unit(
                string_to_timeunit(coerce_int96_timestamp_unit))

        self.source = source
        get_reader(source, use_memory_map, &self.rd_handle)

        with nogil:
            check_status(builder.Open(self.rd_handle, properties, c_metadata))

        # Set up metadata
        with nogil:
            c_metadata = builder.raw_reader().metadata()
        self._metadata = result = FileMetaData()
        result.init(c_metadata)

        if read_dictionary is not None:
            self._set_read_dictionary(read_dictionary, &arrow_props)

        with nogil:
            check_status(builder.memory_pool(self.pool)
                         .properties(arrow_props)
                         .Build(&self.reader))

    cdef _set_read_dictionary(self, read_dictionary,
                              ArrowReaderProperties* props):
        for column in read_dictionary:
            if not isinstance(column, int):
                column = self.column_name_idx(column)
            props.set_read_dictionary(column, True)

    @property
    def column_paths(self):
        cdef:
            FileMetaData container = self.metadata
            const CFileMetaData* metadata = container._metadata
            vector[c_string] path
            int i = 0

        paths = []
        for i in range(0, metadata.num_columns()):
            path = (metadata.schema().Column(i)
                    .path().get().ToDotVector())
            paths.append([frombytes(x) for x in path])

        return paths

    @property
    def metadata(self):
        return self._metadata

    @property
    def schema_arrow(self):
        cdef shared_ptr[CSchema] out
        with nogil:
            check_status(self.reader.get().GetSchema(&out))
        return pyarrow_wrap_schema(out)

    @property
    def num_row_groups(self):
        return self.reader.get().num_row_groups()

    def set_use_threads(self, bint use_threads):
        """
        Parameters
        ----------
        use_threads : bool
        """
        if is_threading_enabled():
            self.reader.get().set_use_threads(use_threads)
        else:
            self.reader.get().set_use_threads(False)

    def set_batch_size(self, int64_t batch_size):
        """
        Parameters
        ----------
        batch_size : int64
        """
        self.reader.get().set_batch_size(batch_size)

    def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
                     bint use_threads=True):
        """
        Parameters
        ----------
        batch_size : int64
        row_groups : list[int]
        column_indices : list[int], optional
        use_threads : bool, default True

        Yields
        ------
        next : RecordBatch
        """
        cdef:
            vector[int] c_row_groups
            vector[int] c_column_indices
            shared_ptr[CRecordBatch] record_batch
            UniquePtrNoGIL[CRecordBatchReader] recordbatchreader

        self.set_batch_size(batch_size)

        if use_threads:
            self.set_use_threads(use_threads)

        for row_group in row_groups:
            c_row_groups.push_back(row_group)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)
            with nogil:
                recordbatchreader = GetResultValue(
                    self.reader.get().GetRecordBatchReader(
                        c_row_groups, c_column_indices
                    )
                )
        else:
            with nogil:
                recordbatchreader = GetResultValue(
                    self.reader.get().GetRecordBatchReader(
                        c_row_groups
                    )
                )

        while True:
            with nogil:
                check_status(
                    recordbatchreader.get().ReadNext(&record_batch)
                )
            if record_batch.get() == NULL:
                break

            yield pyarrow_wrap_batch(record_batch)

    def read_row_group(self, int i, column_indices=None,
                       bint use_threads=True):
        """
        Parameters
        ----------
        i : int
        column_indices : list[int], optional
        use_threads : bool, default True

        Returns
        -------
        table : pyarrow.Table
        """
        return self.read_row_groups([i], column_indices, use_threads)

    def read_row_groups(self, row_groups not None, column_indices=None,
                        bint use_threads=True):
        """
        Parameters
        ----------
        row_groups : list[int]
        column_indices : list[int], optional
        use_threads : bool, default True

        Returns
        -------
        table : pyarrow.Table
        """
        cdef:
            shared_ptr[CTable] ctable
            vector[int] c_row_groups
            vector[int] c_column_indices

        self.set_use_threads(use_threads)

        for row_group in row_groups:
            c_row_groups.push_back(row_group)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

            with nogil:
                check_status(self.reader.get()
                             .ReadRowGroups(c_row_groups, c_column_indices,
                                            &ctable))
        else:
            # Read all columns
            with nogil:
                check_status(self.reader.get()
                             .ReadRowGroups(c_row_groups, &ctable))
        return pyarrow_wrap_table(ctable)

    def read_all(self, column_indices=None, bint use_threads=True):
        """
        Parameters
        ----------
        column_indices : list[int], optional
        use_threads : bool, default True

        Returns
        -------
        table : pyarrow.Table
        """
        cdef:
            shared_ptr[CTable] ctable
            vector[int] c_column_indices

        self.set_use_threads(use_threads)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

            with nogil:
                check_status(self.reader.get()
                             .ReadTable(c_column_indices, &ctable))
        else:
            # Read all columns
            with nogil:
                check_status(self.reader.get()
                             .ReadTable(&ctable))
        return pyarrow_wrap_table(ctable)

    def scan_contents(self, column_indices=None, batch_size=65536):
        """
        Parameters
        ----------
        column_indices : list[int], optional
        batch_size : int32, default 65536

        Returns
        -------
        num_rows : int64
        """
        cdef:
            vector[int] c_column_indices
            int32_t c_batch_size
            int64_t c_num_rows

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

        c_batch_size = batch_size

        with nogil:
            check_status(self.reader.get()
                         .ScanContents(c_column_indices, c_batch_size,
                                       &c_num_rows))

        return c_num_rows

    def column_name_idx(self, column_name):
        """
        Find the index of a column by its name.

        Parameters
        ----------
        column_name : str
            Name of the column; separation of nesting levels is done via ".".

        Returns
        -------
        column_idx : int
            Integer index of the column in the schema.
        """
        cdef:
            FileMetaData container = self.metadata
            const CFileMetaData* metadata = container._metadata
            int i = 0

        if self._column_idx_map is None:
            self._column_idx_map = {}
            for i in range(0, metadata.num_columns()):
                col_bytes = tobytes(metadata.schema().Column(i)
                                    .path().get().ToDotString())
                self._column_idx_map[col_bytes] = i

        return self._column_idx_map[tobytes(column_name)]

    def read_column(self, int column_index):
        """
        Read the column at the specified index.

        Parameters
        ----------
        column_index : int
            Index of the column.

        Returns
        -------
        column : pyarrow.ChunkedArray
        """
        cdef shared_ptr[CChunkedArray] out
        with nogil:
            check_status(self.reader.get()
                         .ReadColumn(column_index, &out))
        return pyarrow_wrap_chunked_array(out)

    def close(self):
        if not self.closed:
            with nogil:
                check_status(self.rd_handle.get().Close())

    @property
    def closed(self):
        if self.rd_handle == NULL:
            return True
        with nogil:
            closed = self.rd_handle.get().closed()
        return closed


cdef CSortingColumn _convert_sorting_column(SortingColumn sorting_column):
    cdef CSortingColumn c_sorting_column

    c_sorting_column.column_idx = sorting_column.column_index
    c_sorting_column.descending = sorting_column.descending
    c_sorting_column.nulls_first = sorting_column.nulls_first

    return c_sorting_column


cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *:
    if not (isinstance(sorting_columns, Sequence)
            and all(isinstance(col, SortingColumn) for col in sorting_columns)):
        raise ValueError(
            "'sorting_columns' must be a list of `SortingColumn`")

    cdef vector[CSortingColumn] c_sorting_columns = [_convert_sorting_column(col)
                                                     for col in sorting_columns]

    return c_sorting_columns


cdef shared_ptr[WriterProperties] _create_writer_properties(
        use_dictionary=None,
        compression=None,
        version=None,
        write_statistics=None,
        data_page_size=None,
        compression_level=None,
        use_byte_stream_split=False,
        column_encoding=None,
        data_page_version=None,
        FileEncryptionProperties encryption_properties=None,
        write_batch_size=None,
        dictionary_pagesize_limit=None,
        write_page_index=False,
        write_page_checksum=False,
        sorting_columns=None,
        store_decimal_as_integer=False) except *:

    """General writer properties"""
    cdef:
        shared_ptr[WriterProperties] properties
        WriterProperties.Builder props

    # data_page_version

    if data_page_version is not None:
        if data_page_version == "1.0":
            props.data_page_version(ParquetDataPageVersion_V1)
        elif data_page_version == "2.0":
            props.data_page_version(ParquetDataPageVersion_V2)
        else:
            raise ValueError("Unsupported Parquet data page version: {0}"
                             .format(data_page_version))

    # version

    if version is not None:
        if version == "1.0":
            props.version(ParquetVersion_V1)
        elif version in ("2.0", "pseudo-2.0"):
            warnings.warn(
                "Parquet format '2.0' pseudo version is deprecated, use "
                "'2.4' or '2.6' for fine-grained feature selection",
                FutureWarning, stacklevel=2)
            props.version(ParquetVersion_V2_0)
        elif version == "2.4":
            props.version(ParquetVersion_V2_4)
        elif version == "2.6":
            props.version(ParquetVersion_V2_6)
        else:
            raise ValueError("Unsupported Parquet format version: {0}"
                             .format(version))

    # compression

    if isinstance(compression, basestring):
        check_compression_name(compression)
        props.compression(compression_from_name(compression))
    elif compression is not None:
        for column, codec in compression.iteritems():
            check_compression_name(codec)
            props.compression(tobytes(column), compression_from_name(codec))

    if isinstance(compression_level, int):
        props.compression_level(compression_level)
    elif compression_level is not None:
        for column, level in compression_level.iteritems():
            props.compression_level(tobytes(column), level)

    # use_dictionary

    if isinstance(use_dictionary, bool):
        if use_dictionary:
            props.enable_dictionary()
            if column_encoding is not None:
                raise ValueError(
                    "To use 'column_encoding' set 'use_dictionary' to False")
        else:
            props.disable_dictionary()
    elif use_dictionary is not None:
        # Deactivate dictionary encoding by default
        props.disable_dictionary()
        for column in use_dictionary:
            props.enable_dictionary(tobytes(column))
            if (column_encoding is not None and
                    column_encoding.get(column) is not None):
                raise ValueError(
                    "To use 'column_encoding' set 'use_dictionary' to False")

    # write_statistics

    if isinstance(write_statistics, bool):
        if write_statistics:
            props.enable_statistics()
        else:
            props.disable_statistics()
    elif write_statistics is not None:
        # Deactivate statistics by default and enable for specified columns
        props.disable_statistics()
        for column in write_statistics:
            props.enable_statistics(tobytes(column))

    # sorting_columns

    if sorting_columns is not None:
        props.set_sorting_columns(_convert_sorting_columns(sorting_columns))

    # use_byte_stream_split

    if isinstance(use_byte_stream_split, bool):
        if use_byte_stream_split:
            if column_encoding is not None:
                raise ValueError(
                    "'use_byte_stream_split' cannot be passed"
                    "together with 'column_encoding'")
            else:
                props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
    elif use_byte_stream_split is not None:
        for column in use_byte_stream_split:
            if column_encoding is None:
                column_encoding = {column: 'BYTE_STREAM_SPLIT'}
            elif column_encoding.get(column, None) is None:
                column_encoding[column] = 'BYTE_STREAM_SPLIT'
            else:
                raise ValueError(
                    "'use_byte_stream_split' cannot be passed"
                    "together with 'column_encoding'")

    # store_decimal_as_integer

    if isinstance(store_decimal_as_integer, bool):
        if store_decimal_as_integer:
            props.enable_store_decimal_as_integer()
        else:
            props.disable_store_decimal_as_integer()
    else:
        raise TypeError("'store_decimal_as_integer' must be a boolean")

    # column_encoding
    # encoding map - encode individual columns

    if column_encoding is not None:
        if isinstance(column_encoding, dict):
            for column, _encoding in column_encoding.items():
                props.encoding(tobytes(column),
                               encoding_enum_from_name(_encoding))
        elif isinstance(column_encoding, str):
            props.encoding(encoding_enum_from_name(column_encoding))
        else:
            raise TypeError(
                "'column_encoding' should be a dictionary or a string")

    if data_page_size is not None:
        props.data_pagesize(data_page_size)

    if write_batch_size is not None:
        props.write_batch_size(write_batch_size)

    if dictionary_pagesize_limit is not None:
        props.dictionary_pagesize_limit(dictionary_pagesize_limit)

    # encryption

    if encryption_properties is not None:
        props.encryption(
            (<FileEncryptionProperties>encryption_properties).unwrap())

    # For backwards compatibility reasons we cap the maximum row group size
    # at 64Mi rows.  This could be changed in the future, though it would be
    # a breaking change.
    #
    # The user can always specify a smaller row group size (and the default
    # is smaller) when calling write_table.  If the call to write_table uses
    # a size larger than this then it will be latched to this value.
    props.max_row_group_length(_MAX_ROW_GROUP_SIZE)

    # checksum

    if write_page_checksum:
        props.enable_page_checksum()
    else:
        props.disable_page_checksum()

    # page index

    if write_page_index:
        props.enable_write_page_index()
    else:
        props.disable_write_page_index()

    properties = props.build()

    return properties


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
        use_deprecated_int96_timestamps=False,
        coerce_timestamps=None,
        allow_truncated_timestamps=False,
        writer_engine_version=None,
        use_compliant_nested_type=True,
        store_schema=True) except *:
    """Arrow writer properties"""
    cdef:
        shared_ptr[ArrowWriterProperties] arrow_properties
        ArrowWriterProperties.Builder arrow_props

    # Store the original Arrow schema so things like dictionary types can
    # be automatically reconstructed
    if store_schema:
        arrow_props.store_schema()

    # int96 support

    if use_deprecated_int96_timestamps:
        arrow_props.enable_deprecated_int96_timestamps()
    else:
        arrow_props.disable_deprecated_int96_timestamps()

    # coerce_timestamps

    if coerce_timestamps == 'ms':
        arrow_props.coerce_timestamps(TimeUnit_MILLI)
    elif coerce_timestamps == 'us':
        arrow_props.coerce_timestamps(TimeUnit_MICRO)
    elif coerce_timestamps is not None:
        raise ValueError('Invalid value for coerce_timestamps: {0}'
                         .format(coerce_timestamps))

    # allow_truncated_timestamps

    if allow_truncated_timestamps:
        arrow_props.allow_truncated_timestamps()
    else:
        arrow_props.disallow_truncated_timestamps()

    # use_compliant_nested_type

    if use_compliant_nested_type:
        arrow_props.enable_compliant_nested_types()
    else:
        arrow_props.disable_compliant_nested_types()

    # writer_engine_version

    if writer_engine_version == "V1":
        warnings.warn("V1 parquet writer engine is a no-op.  Use V2.")
        arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
    elif writer_engine_version != "V2":
        raise ValueError("Unsupported Writer Engine Version: {0}"
                         .format(writer_engine_version))

    arrow_properties = arrow_props.build()

    return arrow_properties

cdef _name_to_index_map(Schema arrow_schema):
    cdef:
        shared_ptr[CSchema] sp_arrow_schema
        shared_ptr[SchemaDescriptor] sp_parquet_schema
        shared_ptr[WriterProperties] props = _create_writer_properties()
        shared_ptr[ArrowWriterProperties] arrow_props = _create_arrow_writer_properties(
            use_deprecated_int96_timestamps=False,
            coerce_timestamps=None,
            allow_truncated_timestamps=False,
            writer_engine_version="V2"
        )

    sp_arrow_schema = pyarrow_unwrap_schema(arrow_schema)

    with nogil:
        check_status(ToParquetSchema(
            sp_arrow_schema.get(), deref(props.get()), deref(arrow_props.get()), &sp_parquet_schema))

    out = dict()

    cdef SchemaDescriptor* parquet_schema = sp_parquet_schema.get()

    for i in range(parquet_schema.num_columns()):
        name = frombytes(parquet_schema.Column(i).path().get().ToDotString())
        out[name] = i

    return out


cdef class ParquetWriter(_Weakrefable):
    cdef:
        unique_ptr[FileWriter] writer
        shared_ptr[COutputStream] sink
        bint own_sink

    cdef readonly:
        object use_dictionary
        object use_deprecated_int96_timestamps
        object use_byte_stream_split
        object column_encoding
        object coerce_timestamps
        object allow_truncated_timestamps
        object compression
        object compression_level
        object data_page_version
        object use_compliant_nested_type
        object version
        object write_statistics
        object writer_engine_version
        int row_group_size
        int64_t data_page_size
        FileEncryptionProperties encryption_properties
        int64_t write_batch_size
        int64_t dictionary_pagesize_limit
        object store_schema
        object store_decimal_as_integer

    def __cinit__(self, where, Schema schema not None, use_dictionary=None,
                  compression=None, version=None,
                  write_statistics=None,
                  MemoryPool memory_pool=None,
                  use_deprecated_int96_timestamps=False,
                  coerce_timestamps=None,
                  data_page_size=None,
                  allow_truncated_timestamps=False,
                  compression_level=None,
                  use_byte_stream_split=False,
                  column_encoding=None,
                  writer_engine_version=None,
                  data_page_version=None,
                  use_compliant_nested_type=True,
                  encryption_properties=None,
                  write_batch_size=None,
                  dictionary_pagesize_limit=None,
                  store_schema=True,
                  write_page_index=False,
                  write_page_checksum=False,
                  sorting_columns=None,
                  store_decimal_as_integer=False):
        cdef:
            shared_ptr[WriterProperties] properties
            shared_ptr[ArrowWriterProperties] arrow_properties
            c_string c_where
            CMemoryPool* pool

        try:
            where = _stringify_path(where)
        except TypeError:
            get_writer(where, &self.sink)
            self.own_sink = False
        else:
            c_where = tobytes(where)
            with nogil:
                self.sink = GetResultValue(FileOutputStream.Open(c_where))
            self.own_sink = True

        properties = _create_writer_properties(
            use_dictionary=use_dictionary,
            compression=compression,
            version=version,
            write_statistics=write_statistics,
            data_page_size=data_page_size,
            compression_level=compression_level,
            use_byte_stream_split=use_byte_stream_split,
            column_encoding=column_encoding,
            data_page_version=data_page_version,
            encryption_properties=encryption_properties,
            write_batch_size=write_batch_size,
            dictionary_pagesize_limit=dictionary_pagesize_limit,
            write_page_index=write_page_index,
            write_page_checksum=write_page_checksum,
            sorting_columns=sorting_columns,
            store_decimal_as_integer=store_decimal_as_integer,
        )
        arrow_properties = _create_arrow_writer_properties(
            use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
            coerce_timestamps=coerce_timestamps,
            allow_truncated_timestamps=allow_truncated_timestamps,
            writer_engine_version=writer_engine_version,
            use_compliant_nested_type=use_compliant_nested_type,
            store_schema=store_schema,
        )

        pool = maybe_unbox_memory_pool(memory_pool)
        with nogil:
            self.writer = move(GetResultValue(
                FileWriter.Open(deref(schema.schema), pool,
                                self.sink, properties, arrow_properties)))

    def close(self):
        with nogil:
            check_status(self.writer.get().Close())
            if self.own_sink:
                check_status(self.sink.get().Close())

    def write_table(self, Table table, row_group_size=None):
        cdef:
            CTable* ctable = table.table
            int64_t c_row_group_size

        if row_group_size is None or row_group_size == -1:
            c_row_group_size = min(ctable.num_rows(), _DEFAULT_ROW_GROUP_SIZE)
        elif row_group_size == 0:
            raise ValueError('Row group size cannot be 0')
        else:
            c_row_group_size = row_group_size

        with nogil:
            check_status(self.writer.get()
                         .WriteTable(deref(ctable), c_row_group_size))

    def add_key_value_metadata(self, key_value_metadata):
        cdef:
            shared_ptr[const CKeyValueMetadata] c_metadata

        c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(key_value_metadata))
        with nogil:
            check_status(self.writer.get()
                         .AddKeyValueMetadata(c_metadata))

    @property
    def metadata(self):
        cdef:
            shared_ptr[CFileMetaData] metadata
            FileMetaData result
        with nogil:
            metadata = self.writer.get().metadata()
        if metadata:
            result = FileMetaData()
            result.init(metadata)
            return result
        raise RuntimeError(
            'file metadata is only available after writer close')