Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
pyarrow / _parquet.pyx
Size: Mime:
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++

import io
from textwrap import indent
import warnings

import numpy as np

from libcpp cimport nullptr

from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Array, Schema,
                          check_status,
                          MemoryPool, maybe_unbox_memory_pool,
                          Table, NativeFile,
                          pyarrow_wrap_chunked_array,
                          pyarrow_wrap_schema,
                          pyarrow_wrap_table,
                          pyarrow_wrap_buffer,
                          pyarrow_wrap_batch,
                          pyarrow_wrap_scalar,
                          NativeFile, get_reader, get_writer,
                          string_to_timeunit)

from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
                         _stringify_path, _datetime_from_int,
                         tobytes, frombytes)

cimport cpython as cp


cdef class Statistics(_Weakrefable):
    """Statistics for a single column in a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        return """{}
  has_min_max: {}
  min: {}
  max: {}
  null_count: {}
  distinct_count: {}
  num_values: {}
  physical_type: {}
  logical_type: {}
  converted_type (legacy): {}""".format(object.__repr__(self),
                                        self.has_min_max,
                                        self.min,
                                        self.max,
                                        self.null_count,
                                        self.distinct_count,
                                        self.num_values,
                                        self.physical_type,
                                        str(self.logical_type),
                                        self.converted_type)

    def to_dict(self):
        """
        Get dictionary represenation of statistics.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        d = dict(
            has_min_max=self.has_min_max,
            min=self.min,
            max=self.max,
            null_count=self.null_count,
            distinct_count=self.distinct_count,
            num_values=self.num_values,
            physical_type=self.physical_type
        )
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, Statistics other):
        """
        Return whether the two column statistics objects are equal.

        Parameters
        ----------
        other : Statistics
            Statistics to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.statistics.get().Equals(deref(other.statistics.get()))

    @property
    def has_min_max(self):
        """Whether min and max are present (bool)."""
        return self.statistics.get().HasMinMax()

    @property
    def has_null_count(self):
        """Whether null count is present (bool)."""
        return self.statistics.get().HasNullCount()

    @property
    def has_distinct_count(self):
        """Whether distinct count is preset (bool)."""
        return self.statistics.get().HasDistinctCount()

    @property
    def min_raw(self):
        """Min value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_min(self.statistics.get())
        else:
            return None

    @property
    def max_raw(self):
        """Max value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_max(self.statistics.get())
        else:
            return None

    @property
    def min(self):
        """
        Min value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            min_scalar, _ = _cast_statistics(self.statistics.get())
            return min_scalar.as_py()
        else:
            return None

    @property
    def max(self):
        """
        Max value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            _, max_scalar = _cast_statistics(self.statistics.get())
            return max_scalar.as_py()
        else:
            return None

    @property
    def null_count(self):
        """Number of null values in chunk (int)."""
        return self.statistics.get().null_count()

    @property
    def distinct_count(self):
        """
        Distinct number of values in chunk (int).

        If this is not set, will return 0.
        """
        # This seems to be zero if not set. See: ARROW-11793
        return self.statistics.get().distinct_count()

    @property
    def num_values(self):
        """Number of non-null values (int)."""
        return self.statistics.get().num_values()

    @property
    def physical_type(self):
        """Physical type of column (str)."""
        raw_physical_type = self.statistics.get().physical_type()
        return physical_type_name_from_enum(raw_physical_type)

    @property
    def logical_type(self):
        """Logical type of column (:class:`ParquetLogicalType`)."""
        return wrap_logical_type(self.statistics.get().descr().logical_type())

    @property
    def converted_type(self):
        """Legacy converted type (str or None)."""
        raw_converted_type = self.statistics.get().descr().converted_type()
        return converted_type_name_from_enum(raw_converted_type)


cdef class ParquetLogicalType(_Weakrefable):
    """Logical type of parquet type."""
    cdef:
        shared_ptr[const CParquetLogicalType] type

    def __cinit__(self):
        pass

    cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
        self.type = type

    def __repr__(self):
        return "{}\n  {}".format(object.__repr__(self), str(self))

    def __str__(self):
        return frombytes(self.type.get().ToString(), safe=True)

    def to_json(self):
        """
        Get a JSON string containing type and type parameters.

        Returns
        -------
        json : str
            JSON representation of type, with at least a field called 'Type'
            which contains the type name. If the type is parameterized, such
            as a decimal with scale and precision, will contain those as fields
            as well.
        """
        return frombytes(self.type.get().ToJSON())

    @property
    def type(self):
        """Name of the logical type (str)."""
        return logical_type_name_from_enum(self.type.get().type())


cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
    cdef ParquetLogicalType out = ParquetLogicalType()
    out.init(type)
    return out


cdef _cast_statistic_raw_min(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).min()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).min()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).min()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).min()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).min()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).min())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)


cdef _cast_statistic_raw_max(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).max()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).max()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).max()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).max()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).max()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).max())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)


cdef _cast_statistics(CStatistics* statistics):
    cdef:
        shared_ptr[CScalar] c_min
        shared_ptr[CScalar] c_max
    check_status(StatisticsAsScalars(statistics[0], &c_min, &c_max))
    return (pyarrow_wrap_scalar(c_min), pyarrow_wrap_scalar(c_max))


cdef _box_byte_array(ParquetByteArray val):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)


cdef _box_flba(ParquetFLBA val, uint32_t len):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)


cdef class ColumnChunkMetaData(_Weakrefable):
    """Column metadata for a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        statistics = indent(repr(self.statistics), 4 * ' ')
        return """{0}
  file_offset: {1}
  file_path: {2}
  physical_type: {3}
  num_values: {4}
  path_in_schema: {5}
  is_stats_set: {6}
  statistics:
{7}
  compression: {8}
  encodings: {9}
  has_dictionary_page: {10}
  dictionary_page_offset: {11}
  data_page_offset: {12}
  total_compressed_size: {13}
  total_uncompressed_size: {14}""".format(object.__repr__(self),
                                          self.file_offset,
                                          self.file_path,
                                          self.physical_type,
                                          self.num_values,
                                          self.path_in_schema,
                                          self.is_stats_set,
                                          statistics,
                                          self.compression,
                                          self.encodings,
                                          self.has_dictionary_page,
                                          self.dictionary_page_offset,
                                          self.data_page_offset,
                                          self.total_compressed_size,
                                          self.total_uncompressed_size)

    def to_dict(self):
        """
        Get dictionary represenation of the column chunk metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        statistics = self.statistics.to_dict() if self.is_stats_set else None
        d = dict(
            file_offset=self.file_offset,
            file_path=self.file_path,
            physical_type=self.physical_type,
            num_values=self.num_values,
            path_in_schema=self.path_in_schema,
            is_stats_set=self.is_stats_set,
            statistics=statistics,
            compression=self.compression,
            encodings=self.encodings,
            has_dictionary_page=self.has_dictionary_page,
            dictionary_page_offset=self.dictionary_page_offset,
            data_page_offset=self.data_page_offset,
            total_compressed_size=self.total_compressed_size,
            total_uncompressed_size=self.total_uncompressed_size
        )
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, ColumnChunkMetaData other):
        """
        Return whether the two column chunk metadata objects are equal.

        Parameters
        ----------
        other : ColumnChunkMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.metadata.Equals(deref(other.metadata))

    @property
    def file_offset(self):
        """Offset into file where column chunk is located (int)."""
        return self.metadata.file_offset()

    @property
    def file_path(self):
        """Optional file path if set (str or None)."""
        return frombytes(self.metadata.file_path())

    @property
    def physical_type(self):
        """Physical type of column (str)."""
        return physical_type_name_from_enum(self.metadata.type())

    @property
    def num_values(self):
        """Total number of values (int)."""
        return self.metadata.num_values()

    @property
    def path_in_schema(self):
        """Nested path to field, separated by periods (str)."""
        path = self.metadata.path_in_schema().get().ToDotString()
        return frombytes(path)

    @property
    def is_stats_set(self):
        """Whether or not statistics are present in metadata (bool)."""
        return self.metadata.is_stats_set()

    @property
    def statistics(self):
        """Statistics for column chunk (:class:`Statistics`)."""
        if not self.metadata.is_stats_set():
            return None
        statistics = Statistics()
        statistics.init(self.metadata.statistics(), self)
        return statistics

    @property
    def compression(self):
        """
        Type of compression used for column (str).

        One of 'UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD',
        or 'UNKNOWN'.
        """
        return compression_name_from_enum(self.metadata.compression())

    @property
    def encodings(self):
        """
        Encodings used for column (tuple of str).

        One of 'PLAIN', 'BIT_PACKED', 'RLE', 'BYTE_STREAM_SPLIT', 'DELTA_BINARY_PACKED',
        'DELTA_BYTE_ARRAY'.
        """
        return tuple(map(encoding_name_from_enum, self.metadata.encodings()))

    @property
    def has_dictionary_page(self):
        """Whether there is dictionary data present in the column chunk (bool)."""
        return bool(self.metadata.has_dictionary_page())

    @property
    def dictionary_page_offset(self):
        """Offset of dictionary page reglative to column chunk offset (int)."""
        if self.has_dictionary_page:
            return self.metadata.dictionary_page_offset()
        else:
            return None

    @property
    def data_page_offset(self):
        """Offset of data page reglative to column chunk offset (int)."""
        return self.metadata.data_page_offset()

    @property
    def has_index_page(self):
        """Not yet supported."""
        raise NotImplementedError('not supported in parquet-cpp')

    @property
    def index_page_offset(self):
        """Not yet supported."""
        raise NotImplementedError("parquet-cpp doesn't return valid values")

    @property
    def total_compressed_size(self):
        """Compresssed size in bytes (int)."""
        return self.metadata.total_compressed_size()

    @property
    def total_uncompressed_size(self):
        """Uncompressed size in bytes (int)."""
        return self.metadata.total_uncompressed_size()


cdef class RowGroupMetaData(_Weakrefable):
    """Metadata for a single row group."""

    def __cinit__(self, FileMetaData parent, int index):
        if index < 0 or index >= parent.num_row_groups:
            raise IndexError('{0} out of bounds'.format(index))
        self.up_metadata = parent._metadata.RowGroup(index)
        self.metadata = self.up_metadata.get()
        self.parent = parent
        self.index = index

    def __reduce__(self):
        return RowGroupMetaData, (self.parent, self.index)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, RowGroupMetaData other):
        """
        Return whether the two row group metadata objects are equal.

        Parameters
        ----------
        other : RowGroupMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.metadata.Equals(deref(other.metadata))

    def column(self, int i):
        """
        Get column metadata at given index.

        Parameters
        ----------
        i : int
            Index of column to get metadata for.

        Returns
        -------
        ColumnChunkMetaData
            Metadata for column within this chunk.
        """
        if i < 0 or i >= self.num_columns:
            raise IndexError('{0} out of bounds'.format(i))
        chunk = ColumnChunkMetaData()
        chunk.init(self, i)
        return chunk

    def __repr__(self):
        return """{0}
  num_columns: {1}
  num_rows: {2}
  total_byte_size: {3}""".format(object.__repr__(self),
                                 self.num_columns,
                                 self.num_rows,
                                 self.total_byte_size)

    def to_dict(self):
        """
        Get dictionary represenation of the row group metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        columns = []
        d = dict(
            num_columns=self.num_columns,
            num_rows=self.num_rows,
            total_byte_size=self.total_byte_size,
            columns=columns,
        )
        for i in range(self.num_columns):
            columns.append(self.column(i).to_dict())
        return d

    @property
    def num_columns(self):
        """Number of columns in this row group (int)."""
        return self.metadata.num_columns()

    @property
    def num_rows(self):
        """Number of rows in this row group (int)."""
        return self.metadata.num_rows()

    @property
    def total_byte_size(self):
        """Total byte size of all the uncompressed column data in this row group (int)."""
        return self.metadata.total_byte_size()


def _reconstruct_filemetadata(Buffer serialized):
    cdef:
        FileMetaData metadata = FileMetaData.__new__(FileMetaData)
        CBuffer *buffer = serialized.buffer.get()
        uint32_t metadata_len = <uint32_t>buffer.size()

    metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))

    return metadata


cdef class FileMetaData(_Weakrefable):
    """Parquet metadata for a single file."""

    def __cinit__(self):
        pass

    def __reduce__(self):
        cdef:
            NativeFile sink = BufferOutputStream()
            COutputStream* c_sink = sink.get_output_stream().get()
        with nogil:
            self._metadata.WriteTo(c_sink)

        cdef Buffer buffer = sink.getvalue()
        return _reconstruct_filemetadata, (buffer,)

    def __repr__(self):
        return """{0}
  created_by: {1}
  num_columns: {2}
  num_rows: {3}
  num_row_groups: {4}
  format_version: {5}
  serialized_size: {6}""".format(object.__repr__(self),
                                 self.created_by, self.num_columns,
                                 self.num_rows, self.num_row_groups,
                                 self.format_version,
                                 self.serialized_size)

    def to_dict(self):
        """
        Get dictionary represenation of the file metadata.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        row_groups = []
        d = dict(
            created_by=self.created_by,
            num_columns=self.num_columns,
            num_rows=self.num_rows,
            num_row_groups=self.num_row_groups,
            row_groups=row_groups,
            format_version=self.format_version,
            serialized_size=self.serialized_size
        )
        for i in range(self.num_row_groups):
            row_groups.append(self.row_group(i).to_dict())
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, FileMetaData other not None):
        """
        Return whether the two file metadata objects are equal.

        Parameters
        ----------
        other : FileMetaData
            Metadata to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self._metadata.Equals(deref(other._metadata))

    @property
    def schema(self):
        """Schema of the file (:class:`ParquetSchema`)."""
        if self._schema is None:
            self._schema = ParquetSchema(self)
        return self._schema

    @property
    def serialized_size(self):
        """Size of the original thrift encoded metadata footer (int)."""
        return self._metadata.size()

    @property
    def num_columns(self):
        """Number of columns in file (int)."""
        return self._metadata.num_columns()

    @property
    def num_rows(self):
        """Total number of rows in file (int)."""
        return self._metadata.num_rows()

    @property
    def num_row_groups(self):
        """Number of row groups in file (int)."""
        return self._metadata.num_row_groups()

    @property
    def format_version(self):
        """
        Parquet format version used in file (str, such as '1.0', '2.4').

        If version is missing or unparsable, will default to assuming '2.4'.
        """
        cdef ParquetVersion version = self._metadata.version()
        if version == ParquetVersion_V1:
            return '1.0'
        elif version == ParquetVersion_V2_0:
            return 'pseudo-2.0'
        elif version == ParquetVersion_V2_4:
            return '2.4'
        elif version == ParquetVersion_V2_6:
            return '2.6'
        else:
            warnings.warn('Unrecognized file version, assuming 2.4: {}'
                          .format(version))
            return '2.4'

    @property
    def created_by(self):
        """
        String describing source of the parquet file (str).

        This typically includes library name and version number. For example, Arrow 7.0's
        writer returns 'parquet-cpp-arrow version 7.0.0'.
        """
        return frombytes(self._metadata.created_by())

    @property
    def metadata(self):
        """Additional metadata as key value pairs (dict[bytes, bytes])."""
        cdef:
            unordered_map[c_string, c_string] metadata
            const CKeyValueMetadata* underlying_metadata
        underlying_metadata = self._metadata.key_value_metadata().get()
        if underlying_metadata != NULL:
            underlying_metadata.ToUnorderedMap(&metadata)
            return metadata
        else:
            return None

    def row_group(self, int i):
        """
        Get metadata for row group at index i.

        Parameters
        ----------
        i : int
            Row group index to get.

        Returns
        -------
        row_group_metadata : RowGroupMetaData
        """
        return RowGroupMetaData(self, i)

    def set_file_path(self, path):
        """
        Set ColumnChunk file paths to the given value.

        This method modifies the ``file_path`` field of each ColumnChunk
        in the FileMetaData to be a particular value.

        Parameters
        ----------
        path : str
            The file path to set on all ColumnChunks.
        """
        cdef:
            c_string c_path = tobytes(path)
        self._metadata.set_file_path(c_path)

    def append_row_groups(self, FileMetaData other):
        """
        Append row groups from other FileMetaData object.

        Parameters
        ----------
        other : FileMetaData
            Other metadata to append row groups from.
        """
        cdef shared_ptr[CFileMetaData] c_metadata

        c_metadata = other.sp_metadata
        self._metadata.AppendRowGroups(deref(c_metadata))

    def write_metadata_file(self, where):
        """
        Write the metadata to a metadata-only Parquet file.

        Parameters
        ----------
        where : path or file-like object
            Where to write the metadata.  Should be a writable path on
            the local filesystem, or a writable file-like object.
        """
        cdef:
            shared_ptr[COutputStream] sink
            c_string c_where

        try:
            where = _stringify_path(where)
        except TypeError:
            get_writer(where, &sink)
        else:
            c_where = tobytes(where)
            with nogil:
                sink = GetResultValue(FileOutputStream.Open(c_where))

        with nogil:
            check_status(
                WriteMetaDataFile(deref(self._metadata), sink.get()))


cdef class ParquetSchema(_Weakrefable):
    """A Parquet schema."""

    def __cinit__(self, FileMetaData container):
        self.parent = container
        self.schema = container._metadata.schema()

    def __repr__(self):
        return "{0}\n{1}".format(
            object.__repr__(self),
            frombytes(self.schema.ToString(), safe=True))

    def __reduce__(self):
        return ParquetSchema, (self.parent,)

    def __len__(self):
        return self.schema.num_columns()

    def __getitem__(self, i):
        return self.column(i)

    @property
    def names(self):
        """Name of each field (list of str)."""
        return [self[i].name for i in range(len(self))]

    def to_arrow_schema(self):
        """
        Convert Parquet schema to effective Arrow schema.

        Returns
        -------
        schema : Schema
        """
        cdef shared_ptr[CSchema] sp_arrow_schema

        with nogil:
            check_status(FromParquetSchema(
                self.schema, default_arrow_reader_properties(),
                self.parent._metadata.key_value_metadata(),
                &sp_arrow_schema))

        return pyarrow_wrap_schema(sp_arrow_schema)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, ParquetSchema other):
        """
        Return whether the two schemas are equal.

        Parameters
        ----------
        other : ParquetSchema
            Schema to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.schema.Equals(deref(other.schema))

    def column(self, i):
        """
        Return the schema for a single column.

        Parameters
        ----------
        i : int
            Index of column in schema.

        Returns
        -------
        column_schema : ColumnSchema
        """
        if i < 0 or i >= len(self):
            raise IndexError('{0} out of bounds'.format(i))

        return ColumnSchema(self, i)


cdef class ColumnSchema(_Weakrefable):
    """Schema for a single column."""
    cdef:
        int index
        ParquetSchema parent
        const ColumnDescriptor* descr

    def __cinit__(self, ParquetSchema schema, int index):
        self.parent = schema
        self.index = index  # for pickling support
        self.descr = schema.schema.Column(index)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def __reduce__(self):
        return ColumnSchema, (self.parent, self.index)

    def equals(self, ColumnSchema other):
        """
        Return whether the two column schemas are equal.

        Parameters
        ----------
        other : ColumnSchema
            Schema to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.descr.Equals(deref(other.descr))

    def __repr__(self):
        physical_type = self.physical_type
        converted_type = self.converted_type
        if converted_type == 'DECIMAL':
            converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
                                                        self.scale)
        elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
            converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
                              .format(self.length))

        return """<ParquetColumnSchema>
  name: {0}
  path: {1}
  max_definition_level: {2}
  max_repetition_level: {3}
  physical_type: {4}
  logical_type: {5}
  converted_type (legacy): {6}""".format(self.name, self.path,
                                         self.max_definition_level,
                                         self.max_repetition_level,
                                         physical_type,
                                         str(self.logical_type),
                                         converted_type)

    @property
    def name(self):
        """Name of field (str)."""
        return frombytes(self.descr.name())

    @property
    def path(self):
        """Nested path to field, separated by periods (str)."""
        return frombytes(self.descr.path().get().ToDotString())

    @property
    def max_definition_level(self):
        """Maximum definition level (int)."""
        return self.descr.max_definition_level()

    @property
    def max_repetition_level(self):
        """Maximum repetition level (int)."""
        return self.descr.max_repetition_level()

    @property
    def physical_type(self):
        """Name of physical type (str)."""
        return physical_type_name_from_enum(self.descr.physical_type())

    @property
    def logical_type(self):
        """Logical type of column (:class:`ParquetLogicalType`)."""
        return wrap_logical_type(self.descr.logical_type())

    @property
    def converted_type(self):
        """Legacy converted type (str or None)."""
        return converted_type_name_from_enum(self.descr.converted_type())

    # FIXED_LEN_BYTE_ARRAY attribute
    @property
    def length(self):
        """Array length if fixed length byte array type, None otherwise (int or None)."""
        return self.descr.type_length()

    # Decimal attributes
    @property
    def precision(self):
        """Precision if decimal type, None otherwise (int or None)."""
        return self.descr.type_precision()

    @property
    def scale(self):
        """Scale if decimal type, None otherwise (int or None)."""
        return self.descr.type_scale()


cdef physical_type_name_from_enum(ParquetType type_):
    return {
        ParquetType_BOOLEAN: 'BOOLEAN',
        ParquetType_INT32: 'INT32',
        ParquetType_INT64: 'INT64',
        ParquetType_INT96: 'INT96',
        ParquetType_FLOAT: 'FLOAT',
        ParquetType_DOUBLE: 'DOUBLE',
        ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
        ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
    }.get(type_, 'UNKNOWN')


cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
    return {
        ParquetLogicalType_UNDEFINED: 'UNDEFINED',
        ParquetLogicalType_STRING: 'STRING',
        ParquetLogicalType_MAP: 'MAP',
        ParquetLogicalType_LIST: 'LIST',
        ParquetLogicalType_ENUM: 'ENUM',
        ParquetLogicalType_DECIMAL: 'DECIMAL',
        ParquetLogicalType_DATE: 'DATE',
        ParquetLogicalType_TIME: 'TIME',
        ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
        ParquetLogicalType_INT: 'INT',
        ParquetLogicalType_JSON: 'JSON',
        ParquetLogicalType_BSON: 'BSON',
        ParquetLogicalType_UUID: 'UUID',
        ParquetLogicalType_NONE: 'NONE',
    }.get(type_, 'UNKNOWN')


cdef converted_type_name_from_enum(ParquetConvertedType type_):
    return {
        ParquetConvertedType_NONE: 'NONE',
        ParquetConvertedType_UTF8: 'UTF8',
        ParquetConvertedType_MAP: 'MAP',
        ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
        ParquetConvertedType_LIST: 'LIST',
        ParquetConvertedType_ENUM: 'ENUM',
        ParquetConvertedType_DECIMAL: 'DECIMAL',
        ParquetConvertedType_DATE: 'DATE',
        ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
        ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
        ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
        ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
        ParquetConvertedType_UINT_8: 'UINT_8',
        ParquetConvertedType_UINT_16: 'UINT_16',
        ParquetConvertedType_UINT_32: 'UINT_32',
        ParquetConvertedType_UINT_64: 'UINT_64',
        ParquetConvertedType_INT_8: 'INT_8',
        ParquetConvertedType_INT_16: 'INT_16',
        ParquetConvertedType_INT_32: 'INT_32',
        ParquetConvertedType_INT_64: 'INT_64',
        ParquetConvertedType_JSON: 'JSON',
        ParquetConvertedType_BSON: 'BSON',
        ParquetConvertedType_INTERVAL: 'INTERVAL',
    }.get(type_, 'UNKNOWN')


cdef encoding_name_from_enum(ParquetEncoding encoding_):
    return {
        ParquetEncoding_PLAIN: 'PLAIN',
        ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
        ParquetEncoding_RLE: 'RLE',
        ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
        ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
        ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
        ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
        ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
        ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
    }.get(encoding_, 'UNKNOWN')


cdef encoding_enum_from_name(str encoding_name):
    enc = {
        'PLAIN': ParquetEncoding_PLAIN,
        'BIT_PACKED': ParquetEncoding_BIT_PACKED,
        'RLE': ParquetEncoding_RLE,
        'BYTE_STREAM_SPLIT': ParquetEncoding_BYTE_STREAM_SPLIT,
        'DELTA_BINARY_PACKED': ParquetEncoding_DELTA_BINARY_PACKED,
        'DELTA_BYTE_ARRAY': ParquetEncoding_DELTA_BYTE_ARRAY,
        'RLE_DICTIONARY': 'dict',
        'PLAIN_DICTIONARY': 'dict',
    }.get(encoding_name, None)
    if enc is None:
        raise ValueError(f"Unsupported column encoding: {encoding_name!r}")
    elif enc == 'dict':
        raise ValueError(f"{encoding_name!r} is already used by default.")
    else:
        return enc


cdef compression_name_from_enum(ParquetCompression compression_):
    return {
        ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
        ParquetCompression_SNAPPY: 'SNAPPY',
        ParquetCompression_GZIP: 'GZIP',
        ParquetCompression_LZO: 'LZO',
        ParquetCompression_BROTLI: 'BROTLI',
        ParquetCompression_LZ4: 'LZ4',
        ParquetCompression_ZSTD: 'ZSTD',
    }.get(compression_, 'UNKNOWN')


cdef int check_compression_name(name) except -1:
    if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
                            'ZSTD'}:
        raise ArrowException("Unsupported compression: " + name)
    return 0


cdef ParquetCompression compression_from_name(name):
    name = name.upper()
    if name == 'SNAPPY':
        return ParquetCompression_SNAPPY
    elif name == 'GZIP':
        return ParquetCompression_GZIP
    elif name == 'LZO':
        return ParquetCompression_LZO
    elif name == 'BROTLI':
        return ParquetCompression_BROTLI
    elif name == 'LZ4':
        return ParquetCompression_LZ4
    elif name == 'ZSTD':
        return ParquetCompression_ZSTD
    else:
        return ParquetCompression_UNCOMPRESSED


cdef class ParquetReader(_Weakrefable):
    cdef:
        object source
        CMemoryPool* pool
        unique_ptr[FileReader] reader
        FileMetaData _metadata

    cdef public:
        _column_idx_map

    def __cinit__(self, MemoryPool memory_pool=None):
        self.pool = maybe_unbox_memory_pool(memory_pool)
        self._metadata = None

    def open(self, object source not None, *, bint use_memory_map=False,
             read_dictionary=None, FileMetaData metadata=None,
             int buffer_size=0, bint pre_buffer=False,
             coerce_int96_timestamp_unit=None,
             FileDecryptionProperties decryption_properties=None,
             thrift_string_size_limit=None,
             thrift_container_size_limit=None):
        cdef:
            shared_ptr[CRandomAccessFile] rd_handle
            shared_ptr[CFileMetaData] c_metadata
            CReaderProperties properties = default_reader_properties()
            ArrowReaderProperties arrow_props = (
                default_arrow_reader_properties())
            c_string path
            FileReaderBuilder builder
            TimeUnit int96_timestamp_unit_code

        if metadata is not None:
            c_metadata = metadata.sp_metadata

        if buffer_size > 0:
            properties.enable_buffered_stream()
            properties.set_buffer_size(buffer_size)
        elif buffer_size == 0:
            properties.disable_buffered_stream()
        else:
            raise ValueError('Buffer size must be larger than zero')

        if thrift_string_size_limit is not None:
            if thrift_string_size_limit <= 0:
                raise ValueError("thrift_string_size_limit "
                                 "must be larger than zero")
            properties.set_thrift_string_size_limit(thrift_string_size_limit)
        if thrift_container_size_limit is not None:
            if thrift_container_size_limit <= 0:
                raise ValueError("thrift_container_size_limit "
                                 "must be larger than zero")
            properties.set_thrift_container_size_limit(
                thrift_container_size_limit)

        if decryption_properties is not None:
            properties.file_decryption_properties(
                decryption_properties.unwrap())

        arrow_props.set_pre_buffer(pre_buffer)

        if coerce_int96_timestamp_unit is None:
            # use the default defined in default_arrow_reader_properties()
            pass
        else:
            arrow_props.set_coerce_int96_timestamp_unit(
                string_to_timeunit(coerce_int96_timestamp_unit))

        self.source = source

        get_reader(source, use_memory_map, &rd_handle)
        with nogil:
            check_status(builder.Open(rd_handle, properties, c_metadata))

        # Set up metadata
        with nogil:
            c_metadata = builder.raw_reader().metadata()
        self._metadata = result = FileMetaData()
        result.init(c_metadata)

        if read_dictionary is not None:
            self._set_read_dictionary(read_dictionary, &arrow_props)

        with nogil:
            check_status(builder.memory_pool(self.pool)
                         .properties(arrow_props)
                         .Build(&self.reader))

    cdef _set_read_dictionary(self, read_dictionary,
                              ArrowReaderProperties* props):
        for column in read_dictionary:
            if not isinstance(column, int):
                column = self.column_name_idx(column)
            props.set_read_dictionary(column, True)

    @property
    def column_paths(self):
        cdef:
            FileMetaData container = self.metadata
            const CFileMetaData* metadata = container._metadata
            vector[c_string] path
            int i = 0

        paths = []
        for i in range(0, metadata.num_columns()):
            path = (metadata.schema().Column(i)
                    .path().get().ToDotVector())
            paths.append([frombytes(x) for x in path])

        return paths

    @property
    def metadata(self):
        return self._metadata

    @property
    def schema_arrow(self):
        cdef shared_ptr[CSchema] out
        with nogil:
            check_status(self.reader.get().GetSchema(&out))
        return pyarrow_wrap_schema(out)

    @property
    def num_row_groups(self):
        return self.reader.get().num_row_groups()

    def set_use_threads(self, bint use_threads):
        self.reader.get().set_use_threads(use_threads)

    def set_batch_size(self, int64_t batch_size):
        self.reader.get().set_batch_size(batch_size)

    def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
                     bint use_threads=True):
        cdef:
            vector[int] c_row_groups
            vector[int] c_column_indices
            shared_ptr[CRecordBatch] record_batch
            shared_ptr[TableBatchReader] batch_reader
            unique_ptr[CRecordBatchReader] recordbatchreader

        self.set_batch_size(batch_size)

        if use_threads:
            self.set_use_threads(use_threads)

        for row_group in row_groups:
            c_row_groups.push_back(row_group)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)
            with nogil:
                check_status(
                    self.reader.get().GetRecordBatchReader(
                        c_row_groups, c_column_indices, &recordbatchreader
                    )
                )
        else:
            with nogil:
                check_status(
                    self.reader.get().GetRecordBatchReader(
                        c_row_groups, &recordbatchreader
                    )
                )

        while True:
            with nogil:
                check_status(
                    recordbatchreader.get().ReadNext(&record_batch)
                )

            if record_batch.get() == NULL:
                break

            yield pyarrow_wrap_batch(record_batch)

    def read_row_group(self, int i, column_indices=None,
                       bint use_threads=True):
        return self.read_row_groups([i], column_indices, use_threads)

    def read_row_groups(self, row_groups not None, column_indices=None,
                        bint use_threads=True):
        cdef:
            shared_ptr[CTable] ctable
            vector[int] c_row_groups
            vector[int] c_column_indices

        self.set_use_threads(use_threads)

        for row_group in row_groups:
            c_row_groups.push_back(row_group)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

            with nogil:
                check_status(self.reader.get()
                             .ReadRowGroups(c_row_groups, c_column_indices,
                                            &ctable))
        else:
            # Read all columns
            with nogil:
                check_status(self.reader.get()
                             .ReadRowGroups(c_row_groups, &ctable))
        return pyarrow_wrap_table(ctable)

    def read_all(self, column_indices=None, bint use_threads=True):
        cdef:
            shared_ptr[CTable] ctable
            vector[int] c_column_indices

        self.set_use_threads(use_threads)

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

            with nogil:
                check_status(self.reader.get()
                             .ReadTable(c_column_indices, &ctable))
        else:
            # Read all columns
            with nogil:
                check_status(self.reader.get()
                             .ReadTable(&ctable))
        return pyarrow_wrap_table(ctable)

    def scan_contents(self, column_indices=None, batch_size=65536):
        cdef:
            vector[int] c_column_indices
            int32_t c_batch_size
            int64_t c_num_rows

        if column_indices is not None:
            for index in column_indices:
                c_column_indices.push_back(index)

        c_batch_size = batch_size

        with nogil:
            check_status(self.reader.get()
                         .ScanContents(c_column_indices, c_batch_size,
                                       &c_num_rows))

        return c_num_rows

    def column_name_idx(self, column_name):
        """
        Find the index of a column by its name.

        Parameters
        ----------
        column_name : str
            Name of the column; separation of nesting levels is done via ".".

        Returns
        -------
        column_idx : int
            Integer index of the column in the schema.
        """
        cdef:
            FileMetaData container = self.metadata
            const CFileMetaData* metadata = container._metadata
            int i = 0

        if self._column_idx_map is None:
            self._column_idx_map = {}
            for i in range(0, metadata.num_columns()):
                col_bytes = tobytes(metadata.schema().Column(i)
                                    .path().get().ToDotString())
                self._column_idx_map[col_bytes] = i

        return self._column_idx_map[tobytes(column_name)]

    def read_column(self, int column_index):
        cdef shared_ptr[CChunkedArray] out
        with nogil:
            check_status(self.reader.get()
                         .ReadColumn(column_index, &out))
        return pyarrow_wrap_chunked_array(out)


cdef shared_ptr[WriterProperties] _create_writer_properties(
        use_dictionary=None,
        compression=None,
        version=None,
        write_statistics=None,
        data_page_size=None,
        compression_level=None,
        use_byte_stream_split=False,
        column_encoding=None,
        data_page_version=None,
        FileEncryptionProperties encryption_properties=None,
        write_batch_size=None,
        dictionary_pagesize_limit=None) except *:
    """General writer properties"""
    cdef:
        shared_ptr[WriterProperties] properties
        WriterProperties.Builder props

    # data_page_version

    if data_page_version is not None:
        if data_page_version == "1.0":
            props.data_page_version(ParquetDataPageVersion_V1)
        elif data_page_version == "2.0":
            props.data_page_version(ParquetDataPageVersion_V2)
        else:
            raise ValueError("Unsupported Parquet data page version: {0}"
                             .format(data_page_version))

    # version

    if version is not None:
        if version == "1.0":
            props.version(ParquetVersion_V1)
        elif version in ("2.0", "pseudo-2.0"):
            warnings.warn(
                "Parquet format '2.0' pseudo version is deprecated, use "
                "'2.4' or '2.6' for fine-grained feature selection",
                FutureWarning, stacklevel=2)
            props.version(ParquetVersion_V2_0)
        elif version == "2.4":
            props.version(ParquetVersion_V2_4)
        elif version == "2.6":
            props.version(ParquetVersion_V2_6)
        else:
            raise ValueError("Unsupported Parquet format version: {0}"
                             .format(version))

    # compression

    if isinstance(compression, basestring):
        check_compression_name(compression)
        props.compression(compression_from_name(compression))
    elif compression is not None:
        for column, codec in compression.iteritems():
            check_compression_name(codec)
            props.compression(tobytes(column), compression_from_name(codec))

    if isinstance(compression_level, int):
        props.compression_level(compression_level)
    elif compression_level is not None:
        for column, level in compression_level.iteritems():
            props.compression_level(tobytes(column), level)

    # use_dictionary

    if isinstance(use_dictionary, bool):
        if use_dictionary:
            props.enable_dictionary()
            if column_encoding is not None:
                raise ValueError(
                    "To use 'column_encoding' set 'use_dictionary' to False")
        else:
            props.disable_dictionary()
    elif use_dictionary is not None:
        # Deactivate dictionary encoding by default
        props.disable_dictionary()
        for column in use_dictionary:
            props.enable_dictionary(tobytes(column))
            if (column_encoding is not None and
                    column_encoding.get(column) is not None):
                raise ValueError(
                    "To use 'column_encoding' set 'use_dictionary' to False")

    # write_statistics

    if isinstance(write_statistics, bool):
        if write_statistics:
            props.enable_statistics()
        else:
            props.disable_statistics()
    elif write_statistics is not None:
        # Deactivate statistics by default and enable for specified columns
        props.disable_statistics()
        for column in write_statistics:
            props.enable_statistics(tobytes(column))

    # use_byte_stream_split

    if isinstance(use_byte_stream_split, bool):
        if use_byte_stream_split:
            if column_encoding is not None:
                raise ValueError(
                    "'use_byte_stream_split' can not be passed"
                    "together with 'column_encoding'")
            else:
                props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
    elif use_byte_stream_split is not None:
        for column in use_byte_stream_split:
            if column_encoding is None:
                column_encoding = {column: 'BYTE_STREAM_SPLIT'}
            elif column_encoding.get(column, None) is None:
                column_encoding[column] = 'BYTE_STREAM_SPLIT'
            else:
                raise ValueError(
                    "'use_byte_stream_split' can not be passed"
                    "together with 'column_encoding'")

    # column_encoding
    # encoding map - encode individual columns

    if column_encoding is not None:
        if isinstance(column_encoding, dict):
            for column, _encoding in column_encoding.items():
                props.encoding(tobytes(column),
                               encoding_enum_from_name(_encoding))
        elif isinstance(column_encoding, str):
            props.encoding(encoding_enum_from_name(column_encoding))
        else:
            raise TypeError(
                "'column_encoding' should be a dictionary or a string")

    if data_page_size is not None:
        props.data_pagesize(data_page_size)

    if write_batch_size is not None:
        props.write_batch_size(write_batch_size)

    if dictionary_pagesize_limit is not None:
        props.dictionary_pagesize_limit(dictionary_pagesize_limit)

    # encryption

    if encryption_properties is not None:
        props.encryption(
            (<FileEncryptionProperties>encryption_properties).unwrap())

    properties = props.build()

    return properties


cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
        use_deprecated_int96_timestamps=False,
        coerce_timestamps=None,
        allow_truncated_timestamps=False,
        writer_engine_version=None,
        use_compliant_nested_type=False) except *:
    """Arrow writer properties"""
    cdef:
        shared_ptr[ArrowWriterProperties] arrow_properties
        ArrowWriterProperties.Builder arrow_props

    # Store the original Arrow schema so things like dictionary types can
    # be automatically reconstructed
    arrow_props.store_schema()

    # int96 support

    if use_deprecated_int96_timestamps:
        arrow_props.enable_deprecated_int96_timestamps()
    else:
        arrow_props.disable_deprecated_int96_timestamps()

    # coerce_timestamps

    if coerce_timestamps == 'ms':
        arrow_props.coerce_timestamps(TimeUnit_MILLI)
    elif coerce_timestamps == 'us':
        arrow_props.coerce_timestamps(TimeUnit_MICRO)
    elif coerce_timestamps is not None:
        raise ValueError('Invalid value for coerce_timestamps: {0}'
                         .format(coerce_timestamps))

    # allow_truncated_timestamps

    if allow_truncated_timestamps:
        arrow_props.allow_truncated_timestamps()
    else:
        arrow_props.disallow_truncated_timestamps()

    # use_compliant_nested_type

    if use_compliant_nested_type:
        arrow_props.enable_compliant_nested_types()
    else:
        arrow_props.disable_compliant_nested_types()

    # writer_engine_version

    if writer_engine_version == "V1":
        warnings.warn("V1 parquet writer engine is a no-op.  Use V2.")
        arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
    elif writer_engine_version != "V2":
        raise ValueError("Unsupported Writer Engine Version: {0}"
                         .format(writer_engine_version))

    arrow_properties = arrow_props.build()

    return arrow_properties


cdef class ParquetWriter(_Weakrefable):
    cdef:
        unique_ptr[FileWriter] writer
        shared_ptr[COutputStream] sink
        bint own_sink

    cdef readonly:
        object use_dictionary
        object use_deprecated_int96_timestamps
        object use_byte_stream_split
        object column_encoding
        object coerce_timestamps
        object allow_truncated_timestamps
        object compression
        object compression_level
        object data_page_version
        object use_compliant_nested_type
        object version
        object write_statistics
        object writer_engine_version
        int row_group_size
        int64_t data_page_size
        FileEncryptionProperties encryption_properties
        int64_t write_batch_size
        int64_t dictionary_pagesize_limit

    def __cinit__(self, where, Schema schema, use_dictionary=None,
                  compression=None, version=None,
                  write_statistics=None,
                  MemoryPool memory_pool=None,
                  use_deprecated_int96_timestamps=False,
                  coerce_timestamps=None,
                  data_page_size=None,
                  allow_truncated_timestamps=False,
                  compression_level=None,
                  use_byte_stream_split=False,
                  column_encoding=None,
                  writer_engine_version=None,
                  data_page_version=None,
                  use_compliant_nested_type=False,
                  encryption_properties=None,
                  write_batch_size=None,
                  dictionary_pagesize_limit=None):
        cdef:
            shared_ptr[WriterProperties] properties
            shared_ptr[ArrowWriterProperties] arrow_properties
            c_string c_where
            CMemoryPool* pool

        try:
            where = _stringify_path(where)
        except TypeError:
            get_writer(where, &self.sink)
            self.own_sink = False
        else:
            c_where = tobytes(where)
            with nogil:
                self.sink = GetResultValue(FileOutputStream.Open(c_where))
            self.own_sink = True

        properties = _create_writer_properties(
            use_dictionary=use_dictionary,
            compression=compression,
            version=version,
            write_statistics=write_statistics,
            data_page_size=data_page_size,
            compression_level=compression_level,
            use_byte_stream_split=use_byte_stream_split,
            column_encoding=column_encoding,
            data_page_version=data_page_version,
            encryption_properties=encryption_properties,
            write_batch_size=write_batch_size,
            dictionary_pagesize_limit=dictionary_pagesize_limit
        )
        arrow_properties = _create_arrow_writer_properties(
            use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
            coerce_timestamps=coerce_timestamps,
            allow_truncated_timestamps=allow_truncated_timestamps,
            writer_engine_version=writer_engine_version,
            use_compliant_nested_type=use_compliant_nested_type
        )

        pool = maybe_unbox_memory_pool(memory_pool)
        with nogil:
            check_status(
                FileWriter.Open(deref(schema.schema), pool,
                                self.sink, properties, arrow_properties,
                                &self.writer))

    def close(self):
        with nogil:
            check_status(self.writer.get().Close())
            if self.own_sink:
                check_status(self.sink.get().Close())

    def write_table(self, Table table, row_group_size=None):
        cdef:
            CTable* ctable = table.table
            int64_t c_row_group_size

        if row_group_size is None or row_group_size == -1:
            c_row_group_size = ctable.num_rows()
        elif row_group_size == 0:
            raise ValueError('Row group size cannot be 0')
        else:
            c_row_group_size = row_group_size

        with nogil:
            check_status(self.writer.get()
                         .WriteTable(deref(ctable), c_row_group_size))

    @property
    def metadata(self):
        cdef:
            shared_ptr[CFileMetaData] metadata
            FileMetaData result
        with nogil:
            metadata = self.writer.get().metadata()
        if metadata:
            result = FileMetaData()
            result.init(metadata)
            return result
        raise RuntimeError(
            'file metadata is only available after writer close')