Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 20.0.0.dev104 

/ _dataset_parquet.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

"""Dataset support for Parquet file format."""

from cython.operator cimport dereference as deref

import os
import warnings

import pyarrow as pa
from pyarrow.lib cimport *
from pyarrow.lib import frombytes, tobytes, is_threading_enabled
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.includes.libarrow_dataset_parquet cimport *
from pyarrow._fs cimport FileSystem

from pyarrow._compute cimport Expression, _bind
from pyarrow._dataset cimport (
    _make_file_source,
    DatasetFactory,
    FileFormat,
    FileFragment,
    FileWriteOptions,
    Fragment,
    FragmentScanOptions,
    CacheOptions,
    Partitioning,
    PartitioningFactory,
    WrittenFile
)

from pyarrow._parquet cimport (
    _create_writer_properties, _create_arrow_writer_properties,
    FileMetaData,
)


try:
    from pyarrow._dataset_parquet_encryption import (
        set_encryption_config, set_decryption_config, set_decryption_properties
    )
    parquet_encryption_enabled = True
except ImportError:
    parquet_encryption_enabled = False


cdef Expression _true = Expression._scalar(True)

ctypedef CParquetFileWriter* _CParquetFileWriterPtr


cdef class ParquetFileFormat(FileFormat):
    """
    FileFormat for Parquet

    Parameters
    ----------
    read_options : ParquetReadOptions
        Read options for the file.
    default_fragment_scan_options : ParquetFragmentScanOptions
        Scan Options for the file.
    **kwargs : dict
        Additional options for read option or scan option
    """

    cdef:
        CParquetFileFormat* parquet_format

    def __init__(self, read_options=None,
                 default_fragment_scan_options=None,
                 **kwargs):
        cdef:
            shared_ptr[CParquetFileFormat] wrapped
            CParquetFileFormatReaderOptions* options

        # Read/scan options
        read_options_args = {option: kwargs[option] for option in kwargs
                             if option in _PARQUET_READ_OPTIONS}
        scan_args = {option: kwargs[option] for option in kwargs
                     if option not in _PARQUET_READ_OPTIONS}
        if read_options and read_options_args:
            duplicates = ', '.join(sorted(read_options_args))
            raise ValueError(f'If `read_options` is given, '
                             f'cannot specify {duplicates}')
        if default_fragment_scan_options and scan_args:
            duplicates = ', '.join(sorted(scan_args))
            raise ValueError(f'If `default_fragment_scan_options` is given, '
                             f'cannot specify {duplicates}')

        if read_options is None:
            read_options = ParquetReadOptions(**read_options_args)
        elif isinstance(read_options, dict):
            # For backwards compatibility
            duplicates = []
            for option, value in read_options.items():
                if option in _PARQUET_READ_OPTIONS:
                    read_options_args[option] = value
                else:
                    duplicates.append(option)
                    scan_args[option] = value
            if duplicates:
                duplicates = ", ".join(duplicates)
                warnings.warn(f'The scan options {duplicates} should be '
                              'specified directly as keyword arguments')
            read_options = ParquetReadOptions(**read_options_args)
        elif not isinstance(read_options, ParquetReadOptions):
            raise TypeError('`read_options` must be either a dictionary or an '
                            'instance of ParquetReadOptions')

        if default_fragment_scan_options is None:
            default_fragment_scan_options = ParquetFragmentScanOptions(**scan_args)
        elif isinstance(default_fragment_scan_options, dict):
            default_fragment_scan_options = ParquetFragmentScanOptions(
                **default_fragment_scan_options)
        elif not isinstance(default_fragment_scan_options,
                            ParquetFragmentScanOptions):
            raise TypeError('`default_fragment_scan_options` must be either a '
                            'dictionary or an instance of '
                            'ParquetFragmentScanOptions')

        wrapped = make_shared[CParquetFileFormat]()

        options = &(wrapped.get().reader_options)
        if read_options.dictionary_columns is not None:
            for column in read_options.dictionary_columns:
                options.dict_columns.insert(tobytes(column))
        options.coerce_int96_timestamp_unit = \
            read_options._coerce_int96_timestamp_unit

        self.init(<shared_ptr[CFileFormat]> wrapped)
        self.default_fragment_scan_options = default_fragment_scan_options

    cdef void init(self, const shared_ptr[CFileFormat]& sp):
        FileFormat.init(self, sp)
        self.parquet_format = <CParquetFileFormat*> sp.get()

    cdef WrittenFile _finish_write(self, path, base_dir,
                                   CFileWriter* file_writer):
        cdef:
            FileMetaData parquet_metadata
            CParquetFileWriter* parquet_file_writer

        parquet_metadata = None
        parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer)
        with nogil:
            metadata = deref(
                deref(parquet_file_writer).parquet_writer()).metadata()
        if metadata:
            parquet_metadata = FileMetaData()
            parquet_metadata.init(metadata)
            parquet_metadata.set_file_path(os.path.relpath(path, base_dir))

        size = GetResultValue(file_writer.GetBytesWritten())

        return WrittenFile(path, parquet_metadata, size)

    @property
    def read_options(self):
        cdef CParquetFileFormatReaderOptions* options
        options = &self.parquet_format.reader_options
        parquet_read_options = ParquetReadOptions(
            dictionary_columns={frombytes(col)
                                for col in options.dict_columns},
        )
        # Read options getter/setter works with strings so setting
        # the private property which uses the C Type
        parquet_read_options._coerce_int96_timestamp_unit = \
            options.coerce_int96_timestamp_unit
        return parquet_read_options

    def make_write_options(self, **kwargs):
        """
        Parameters
        ----------
        **kwargs : dict

        Returns
        -------
        pyarrow.dataset.FileWriteOptions
        """
        # Safeguard from calling make_write_options as a static class method
        if not isinstance(self, ParquetFileFormat):
            raise TypeError("make_write_options() should be called on "
                            "an instance of ParquetFileFormat")
        opts = FileFormat.make_write_options(self)
        (<ParquetFileWriteOptions> opts).update(**kwargs)
        return opts

    cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
        if options.type_name == 'parquet':
            self.parquet_format.default_fragment_scan_options = options.wrapped
        else:
            super()._set_default_fragment_scan_options(options)

    def equals(self, ParquetFileFormat other):
        """
        Parameters
        ----------
        other : pyarrow.dataset.ParquetFileFormat

        Returns
        -------
        bool
        """
        return (
            self.read_options.equals(other.read_options) and
            self.default_fragment_scan_options ==
            other.default_fragment_scan_options
        )

    @property
    def default_extname(self):
        return "parquet"

    def __reduce__(self):
        return ParquetFileFormat, (self.read_options,
                                   self.default_fragment_scan_options)

    def __repr__(self):
        return f"<ParquetFileFormat read_options={self.read_options}>"

    def make_fragment(self, file, filesystem=None,
                      Expression partition_expression=None, row_groups=None, *, file_size=None):
        """
        Make a FileFragment from a given file.

        Parameters
        ----------
        file : file-like object, path-like or str
            The file or file path to make a fragment from.
        filesystem : Filesystem, optional
            If `filesystem` is given, `file` must be a string and specifies
            the path of the file to read from the filesystem.
        partition_expression : Expression, optional
            An expression that is guaranteed true for all rows in the fragment.  Allows
            fragment to be potentially skipped while scanning with a filter.
        row_groups : Iterable, optional
            The indices of the row groups to include
        file_size : int, optional
            The size of the file in bytes. Can improve performance with high-latency filesystems
            when file size needs to be known before reading.

        Returns
        -------
        fragment : Fragment
            The file fragment
        """
        cdef:
            vector[int] c_row_groups
        if partition_expression is None:
            partition_expression = _true
        if row_groups is None:
            return super().make_fragment(file, filesystem,
                                         partition_expression, file_size=file_size)

        c_source = _make_file_source(file, filesystem, file_size)
        c_row_groups = [<int> row_group for row_group in set(row_groups)]

        c_fragment = <shared_ptr[CFragment]> GetResultValue(
            self.parquet_format.MakeFragment(move(c_source),
                                             partition_expression.unwrap(),
                                             <shared_ptr[CSchema]>nullptr,
                                             move(c_row_groups)))
        return Fragment.wrap(move(c_fragment))


class RowGroupInfo:
    """
    A wrapper class for RowGroup information

    Parameters
    ----------
    id : integer
        The group ID.
    metadata : FileMetaData
        The rowgroup metadata.
    schema : Schema
        Schema of the rows.
    """

    def __init__(self, id, metadata, schema):
        self.id = id
        self.metadata = metadata
        self.schema = schema

    @property
    def num_rows(self):
        return self.metadata.num_rows

    @property
    def total_byte_size(self):
        return self.metadata.total_byte_size

    @property
    def statistics(self):
        def name_stats(i):
            col = self.metadata.column(i)

            stats = col.statistics
            if stats is None or not stats.has_min_max:
                return None, None

            name = col.path_in_schema
            field_index = self.schema.get_field_index(name)
            if field_index < 0:
                return None, None

            typ = self.schema.field(field_index).type
            return col.path_in_schema, {
                'min': pa.scalar(stats.min, type=typ).as_py(),
                'max': pa.scalar(stats.max, type=typ).as_py()
            }

        return {
            name: stats for name, stats
            in map(name_stats, range(self.metadata.num_columns))
            if stats is not None
        }

    def __repr__(self):
        return "RowGroupInfo({})".format(self.id)

    def __eq__(self, other):
        if isinstance(other, int):
            return self.id == other
        if not isinstance(other, RowGroupInfo):
            return False
        return self.id == other.id


cdef class ParquetFileFragment(FileFragment):
    """A Fragment representing a parquet file."""

    cdef:
        CParquetFileFragment* parquet_file_fragment

    cdef void init(self, const shared_ptr[CFragment]& sp):
        FileFragment.init(self, sp)
        self.parquet_file_fragment = <CParquetFileFragment*> sp.get()

    def __reduce__(self):
        buffer = self.buffer
        # parquet_file_fragment.row_groups() is empty if the metadata
        # information of the file is not yet populated
        if not bool(self.parquet_file_fragment.row_groups()):
            row_groups = None
        else:
            row_groups = [row_group.id for row_group in self.row_groups]

        return self.format.make_fragment, (
            self.path if buffer is None else buffer,
            self.filesystem,
            self.partition_expression,
            row_groups
        )

    def ensure_complete_metadata(self):
        """
        Ensure that all metadata (statistics, physical schema, ...) have
        been read and cached in this fragment.
        """
        with nogil:
            check_status(self.parquet_file_fragment.EnsureCompleteMetadata())

    @property
    def row_groups(self):
        metadata = self.metadata
        cdef vector[int] row_groups = self.parquet_file_fragment.row_groups()
        return [RowGroupInfo(i, metadata.row_group(i), self.physical_schema)
                for i in row_groups]

    @property
    def metadata(self):
        self.ensure_complete_metadata()
        cdef FileMetaData metadata = FileMetaData()
        metadata.init(self.parquet_file_fragment.metadata())
        return metadata

    @property
    def num_row_groups(self):
        """
        Return the number of row groups viewed by this fragment (not the
        number of row groups in the origin file).
        """
        self.ensure_complete_metadata()
        return self.parquet_file_fragment.row_groups().size()

    def split_by_row_group(self, Expression filter=None,
                           Schema schema=None):
        """
        Split the fragment into multiple fragments.

        Yield a Fragment wrapping each row group in this ParquetFileFragment.
        Row groups will be excluded whose metadata contradicts the optional
        filter.

        Parameters
        ----------
        filter : Expression, default None
            Only include the row groups which satisfy this predicate (using
            the Parquet RowGroup statistics).
        schema : Schema, default None
            Schema to use when filtering row groups. Defaults to the
            Fragment's physical schema

        Returns
        -------
        A list of Fragments
        """
        cdef:
            vector[shared_ptr[CFragment]] c_fragments
            CExpression c_filter
            shared_ptr[CFragment] c_fragment

        schema = schema or self.physical_schema
        c_filter = _bind(filter, schema)
        with nogil:
            c_fragments = move(GetResultValue(
                self.parquet_file_fragment.SplitByRowGroup(move(c_filter))))

        return [Fragment.wrap(c_fragment) for c_fragment in c_fragments]

    def subset(self, Expression filter=None, Schema schema=None,
               object row_group_ids=None):
        """
        Create a subset of the fragment (viewing a subset of the row groups).

        Subset can be specified by either a filter predicate (with optional
        schema) or by a list of row group IDs. Note that when using a filter,
        the resulting fragment can be empty (viewing no row groups).

        Parameters
        ----------
        filter : Expression, default None
            Only include the row groups which satisfy this predicate (using
            the Parquet RowGroup statistics).
        schema : Schema, default None
            Schema to use when filtering row groups. Defaults to the
            Fragment's physical schema
        row_group_ids : list of ints
            The row group IDs to include in the subset. Can only be specified
            if `filter` is None.

        Returns
        -------
        ParquetFileFragment
        """
        cdef:
            CExpression c_filter
            vector[int] c_row_group_ids
            shared_ptr[CFragment] c_fragment

        if filter is not None and row_group_ids is not None:
            raise ValueError(
                "Cannot specify both 'filter' and 'row_group_ids'."
            )

        if filter is not None:
            schema = schema or self.physical_schema
            c_filter = _bind(filter, schema)
            with nogil:
                c_fragment = move(GetResultValue(
                    self.parquet_file_fragment.SubsetWithFilter(
                        move(c_filter))))
        elif row_group_ids is not None:
            c_row_group_ids = [
                <int> row_group for row_group in sorted(set(row_group_ids))
            ]
            with nogil:
                c_fragment = move(GetResultValue(
                    self.parquet_file_fragment.SubsetWithIds(
                        move(c_row_group_ids))))
        else:
            raise ValueError(
                "Need to specify one of 'filter' or 'row_group_ids'"
            )

        return Fragment.wrap(c_fragment)


cdef class ParquetReadOptions(_Weakrefable):
    """
    Parquet format specific options for reading.

    Parameters
    ----------
    dictionary_columns : list of string, default None
        Names of columns which should be dictionary encoded as
        they are read
    coerce_int96_timestamp_unit : str, default None
        Cast timestamps that are stored in INT96 format to a particular
        resolution (e.g. 'ms'). Setting to None is equivalent to 'ns'
        and therefore INT96 timestamps will be inferred as timestamps
        in nanoseconds
    """

    cdef public:
        set dictionary_columns
        TimeUnit _coerce_int96_timestamp_unit

    # Also see _PARQUET_READ_OPTIONS
    def __init__(self, dictionary_columns=None,
                 coerce_int96_timestamp_unit=None):
        self.dictionary_columns = set(dictionary_columns or set())
        self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit

    @property
    def coerce_int96_timestamp_unit(self):
        return timeunit_to_string(self._coerce_int96_timestamp_unit)

    @coerce_int96_timestamp_unit.setter
    def coerce_int96_timestamp_unit(self, unit):
        if unit is not None:
            self._coerce_int96_timestamp_unit = string_to_timeunit(unit)
        else:
            self._coerce_int96_timestamp_unit = TimeUnit_NANO

    def equals(self, ParquetReadOptions other):
        """
        Parameters
        ----------
        other : pyarrow.dataset.ParquetReadOptions

        Returns
        -------
        bool
        """
        return (self.dictionary_columns == other.dictionary_columns and
                self.coerce_int96_timestamp_unit ==
                other.coerce_int96_timestamp_unit)

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return False

    def __repr__(self):
        return (
            f"<ParquetReadOptions"
            f" dictionary_columns={self.dictionary_columns}"
            f" coerce_int96_timestamp_unit={self.coerce_int96_timestamp_unit}>"
        )


cdef class ParquetFileWriteOptions(FileWriteOptions):

    def update(self, **kwargs):
        """
        Parameters
        ----------
        **kwargs : dict
        """
        arrow_fields = {
            "use_deprecated_int96_timestamps",
            "coerce_timestamps",
            "allow_truncated_timestamps",
            "use_compliant_nested_type",
        }

        setters = set()
        for name, value in kwargs.items():
            if name not in self._properties:
                raise TypeError("unexpected parquet write option: " + name)
            self._properties[name] = value
            if name in arrow_fields:
                setters.add(self._set_arrow_properties)
            elif name == "encryption_config" and value is not None:
                setters.add(self._set_encryption_config)
            else:
                setters.add(self._set_properties)

        for setter in setters:
            setter()

    def _set_properties(self):
        cdef CParquetFileWriteOptions* opts = self.parquet_options

        opts.writer_properties = _create_writer_properties(
            use_dictionary=self._properties["use_dictionary"],
            compression=self._properties["compression"],
            version=self._properties["version"],
            write_statistics=self._properties["write_statistics"],
            data_page_size=self._properties["data_page_size"],
            compression_level=self._properties["compression_level"],
            use_byte_stream_split=(
                self._properties["use_byte_stream_split"]
            ),
            column_encoding=self._properties["column_encoding"],
            data_page_version=self._properties["data_page_version"],
            encryption_properties=self._properties["encryption_properties"],
            write_batch_size=self._properties["write_batch_size"],
            dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"],
            write_page_index=self._properties["write_page_index"],
            write_page_checksum=self._properties["write_page_checksum"],
            sorting_columns=self._properties["sorting_columns"],
            store_decimal_as_integer=self._properties["store_decimal_as_integer"],
        )

    def _set_arrow_properties(self):
        cdef CParquetFileWriteOptions* opts = self.parquet_options

        opts.arrow_writer_properties = _create_arrow_writer_properties(
            use_deprecated_int96_timestamps=(
                self._properties["use_deprecated_int96_timestamps"]
            ),
            coerce_timestamps=self._properties["coerce_timestamps"],
            allow_truncated_timestamps=(
                self._properties["allow_truncated_timestamps"]
            ),
            writer_engine_version="V2",
            use_compliant_nested_type=(
                self._properties["use_compliant_nested_type"]
            )
        )

    def _set_encryption_config(self):
        if not parquet_encryption_enabled:
            raise NotImplementedError(
                "Encryption is not enabled in your installation of pyarrow, but an "
                "encryption_config was provided."
            )
        set_encryption_config(self, self._properties["encryption_config"])

    cdef void init(self, const shared_ptr[CFileWriteOptions]& sp):
        FileWriteOptions.init(self, sp)
        self.parquet_options = <CParquetFileWriteOptions*> sp.get()
        self._properties = dict(
            use_dictionary=True,
            compression="snappy",
            version="2.6",
            write_statistics=None,
            data_page_size=None,
            compression_level=None,
            use_byte_stream_split=False,
            column_encoding=None,
            data_page_version="1.0",
            use_deprecated_int96_timestamps=False,
            coerce_timestamps=None,
            allow_truncated_timestamps=False,
            use_compliant_nested_type=True,
            encryption_properties=None,
            write_batch_size=None,
            dictionary_pagesize_limit=None,
            write_page_index=False,
            encryption_config=None,
            write_page_checksum=False,
            sorting_columns=None,
            store_decimal_as_integer=False,
        )

        self._set_properties()
        self._set_arrow_properties()

    def __repr__(self):
        return "<pyarrow.dataset.ParquetFileWriteOptions {0}>".format(
            " ".join([f"{key}={value}" for key, value in self._properties.items()])
        )


cdef set _PARQUET_READ_OPTIONS = {
    'dictionary_columns', 'coerce_int96_timestamp_unit'
}


cdef class ParquetFragmentScanOptions(FragmentScanOptions):
    """
    Scan-specific options for Parquet fragments.

    Parameters
    ----------
    use_buffered_stream : bool, default False
        Read files through buffered input streams rather than loading entire
        row groups at once. This may be enabled to reduce memory overhead.
        Disabled by default.
    buffer_size : int, default 8192
        Size of buffered stream, if enabled. Default is 8KB.
    pre_buffer : bool, default True
        If enabled, pre-buffer the raw Parquet data instead of issuing one
        read per column chunk. This can improve performance on high-latency
        filesystems (e.g. S3, GCS) by coalescing and issuing file reads in
        parallel using a background I/O thread pool.
        Set to False if you want to prioritize minimal memory usage
        over maximum speed.
    cache_options : pyarrow.CacheOptions, default None
        Cache options used when pre_buffer is enabled. The default values should
        be good for most use cases. You may want to adjust these for example if
        you have exceptionally high latency to the file system. 
    thrift_string_size_limit : int, default None
        If not None, override the maximum total string size allocated
        when decoding Thrift structures. The default limit should be
        sufficient for most Parquet files.
    thrift_container_size_limit : int, default None
        If not None, override the maximum total size of containers allocated
        when decoding Thrift structures. The default limit should be
        sufficient for most Parquet files.
    decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None
        If not None, use the provided ParquetDecryptionConfig to decrypt the
        Parquet file.
    decryption_properties : pyarrow.parquet.FileDecryptionProperties, default None
        If not None, use the provided FileDecryptionProperties to decrypt encrypted
        Parquet file.
    page_checksum_verification : bool, default False
        If True, verify the page checksum for each page read from the file.
    """

    # Avoid mistakingly creating attributes
    __slots__ = ()

    def __init__(self, *, bint use_buffered_stream=False,
                 buffer_size=8192,
                 bint pre_buffer=True,
                 cache_options=None,
                 thrift_string_size_limit=None,
                 thrift_container_size_limit=None,
                 decryption_config=None,
                 decryption_properties=None,
                 bint page_checksum_verification=False):
        self.init(shared_ptr[CFragmentScanOptions](
            new CParquetFragmentScanOptions()))
        self.use_buffered_stream = use_buffered_stream
        self.buffer_size = buffer_size
        if pre_buffer and not is_threading_enabled():
            pre_buffer = False
        self.pre_buffer = pre_buffer
        if cache_options is not None:
            self.cache_options = cache_options
        if thrift_string_size_limit is not None:
            self.thrift_string_size_limit = thrift_string_size_limit
        if thrift_container_size_limit is not None:
            self.thrift_container_size_limit = thrift_container_size_limit
        if decryption_config is not None:
            self.parquet_decryption_config = decryption_config
        if decryption_properties is not None:
            self.decryption_properties = decryption_properties
        self.page_checksum_verification = page_checksum_verification

    cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp):
        FragmentScanOptions.init(self, sp)
        self.parquet_options = <CParquetFragmentScanOptions*> sp.get()

    cdef CReaderProperties* reader_properties(self):
        return self.parquet_options.reader_properties.get()

    cdef ArrowReaderProperties* arrow_reader_properties(self):
        return self.parquet_options.arrow_reader_properties.get()

    @property
    def use_buffered_stream(self):
        return self.reader_properties().is_buffered_stream_enabled()

    @use_buffered_stream.setter
    def use_buffered_stream(self, bint use_buffered_stream):
        if use_buffered_stream:
            self.reader_properties().enable_buffered_stream()
        else:
            self.reader_properties().disable_buffered_stream()

    @property
    def buffer_size(self):
        return self.reader_properties().buffer_size()

    @buffer_size.setter
    def buffer_size(self, buffer_size):
        if buffer_size <= 0:
            raise ValueError("Buffer size must be larger than zero")
        self.reader_properties().set_buffer_size(buffer_size)

    @property
    def pre_buffer(self):
        return self.arrow_reader_properties().pre_buffer()

    @pre_buffer.setter
    def pre_buffer(self, bint pre_buffer):
        if pre_buffer and not is_threading_enabled():
            return
        self.arrow_reader_properties().set_pre_buffer(pre_buffer)

    @property
    def cache_options(self):
        return CacheOptions.wrap(self.arrow_reader_properties().cache_options())

    @cache_options.setter
    def cache_options(self, CacheOptions options):
        self.arrow_reader_properties().set_cache_options(options.unwrap())

    @property
    def thrift_string_size_limit(self):
        return self.reader_properties().thrift_string_size_limit()

    @thrift_string_size_limit.setter
    def thrift_string_size_limit(self, size):
        if size <= 0:
            raise ValueError("size must be larger than zero")
        self.reader_properties().set_thrift_string_size_limit(size)

    @property
    def thrift_container_size_limit(self):
        return self.reader_properties().thrift_container_size_limit()

    @thrift_container_size_limit.setter
    def thrift_container_size_limit(self, size):
        if size <= 0:
            raise ValueError("size must be larger than zero")
        self.reader_properties().set_thrift_container_size_limit(size)

    @property
    def decryption_properties(self):
        if not parquet_encryption_enabled:
            raise NotImplementedError(
                "Unable to access encryption features. "
                "Encryption is not enabled in your installation of pyarrow."
            )
        return self._decryption_properties

    @decryption_properties.setter
    def decryption_properties(self, config):
        if not parquet_encryption_enabled:
            raise NotImplementedError(
                "Encryption is not enabled in your installation of pyarrow, but "
                "decryption_properties were provided."
            )
        set_decryption_properties(self, config)
        self._decryption_properties = config

    @property
    def parquet_decryption_config(self):
        if not parquet_encryption_enabled:
            raise NotImplementedError(
                "Unable to access encryption features. "
                "Encryption is not enabled in your installation of pyarrow."
            )
        return self._parquet_decryption_config

    @parquet_decryption_config.setter
    def parquet_decryption_config(self, config):
        if not parquet_encryption_enabled:
            raise NotImplementedError(
                "Encryption is not enabled in your installation of pyarrow, but a "
                "decryption_config was provided."
            )
        set_decryption_config(self, config)
        self._parquet_decryption_config = config

    @property
    def page_checksum_verification(self):
        return self.reader_properties().page_checksum_verification()

    @page_checksum_verification.setter
    def page_checksum_verification(self, bint page_checksum_verification):
        self.reader_properties().set_page_checksum_verification(page_checksum_verification)

    def equals(self, ParquetFragmentScanOptions other):
        """
        Parameters
        ----------
        other : pyarrow.dataset.ParquetFragmentScanOptions

        Returns
        -------
        bool
        """
        attrs = (
            self.use_buffered_stream, self.buffer_size, self.pre_buffer, self.cache_options,
            self.thrift_string_size_limit, self.thrift_container_size_limit,
            self.page_checksum_verification)
        other_attrs = (
            other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.cache_options,
            other.thrift_string_size_limit,
            other.thrift_container_size_limit, other.page_checksum_verification)
        return attrs == other_attrs

    @staticmethod
    def _reconstruct(kwargs):
        # __reduce__ doesn't allow passing named arguments directly to the
        # reconstructor, hence this wrapper.
        return ParquetFragmentScanOptions(**kwargs)

    def __reduce__(self):
        kwargs = dict(
            use_buffered_stream=self.use_buffered_stream,
            buffer_size=self.buffer_size,
            pre_buffer=self.pre_buffer,
            cache_options=self.cache_options,
            thrift_string_size_limit=self.thrift_string_size_limit,
            thrift_container_size_limit=self.thrift_container_size_limit,
            page_checksum_verification=self.page_checksum_verification
        )
        return ParquetFragmentScanOptions._reconstruct, (kwargs,)


cdef class ParquetFactoryOptions(_Weakrefable):
    """
    Influences the discovery of parquet dataset.

    Parameters
    ----------
    partition_base_dir : str, optional
        For the purposes of applying the partitioning, paths will be
        stripped of the partition_base_dir. Files not matching the
        partition_base_dir prefix will be skipped for partitioning discovery.
        The ignored files will still be part of the Dataset, but will not
        have partition information.
    partitioning : Partitioning, PartitioningFactory, optional
        The partitioning scheme applied to fragments, see ``Partitioning``.
    validate_column_chunk_paths : bool, default False
        Assert that all ColumnChunk paths are consistent. The parquet spec
        allows for ColumnChunk data to be stored in multiple files, but
        ParquetDatasetFactory supports only a single file with all ColumnChunk
        data. If this flag is set construction of a ParquetDatasetFactory will
        raise an error if ColumnChunk data is not resident in a single file.
    """

    cdef:
        CParquetFactoryOptions options

    __slots__ = ()  # avoid mistakingly creating attributes

    def __init__(self, partition_base_dir=None, partitioning=None,
                 validate_column_chunk_paths=False):
        if isinstance(partitioning, PartitioningFactory):
            self.partitioning_factory = partitioning
        elif isinstance(partitioning, Partitioning):
            self.partitioning = partitioning

        if partition_base_dir is not None:
            self.partition_base_dir = partition_base_dir

        self.options.validate_column_chunk_paths = validate_column_chunk_paths

    cdef inline CParquetFactoryOptions unwrap(self):
        return self.options

    @property
    def partitioning(self):
        """Partitioning to apply to discovered files.

        NOTE: setting this property will overwrite partitioning_factory.
        """
        c_partitioning = self.options.partitioning.partitioning()
        if c_partitioning.get() == nullptr:
            return None
        return Partitioning.wrap(c_partitioning)

    @partitioning.setter
    def partitioning(self, Partitioning value):
        self.options.partitioning = (<Partitioning> value).unwrap()

    @property
    def partitioning_factory(self):
        """PartitioningFactory to apply to discovered files and
        discover a Partitioning.

        NOTE: setting this property will overwrite partitioning.
        """
        c_factory = self.options.partitioning.factory()
        if c_factory.get() == nullptr:
            return None
        return PartitioningFactory.wrap(c_factory, None, None)

    @partitioning_factory.setter
    def partitioning_factory(self, PartitioningFactory value):
        self.options.partitioning = (<PartitioningFactory> value).unwrap()

    @property
    def partition_base_dir(self):
        """
        Base directory to strip paths before applying the partitioning.
        """
        return frombytes(self.options.partition_base_dir)

    @partition_base_dir.setter
    def partition_base_dir(self, value):
        self.options.partition_base_dir = tobytes(value)

    @property
    def validate_column_chunk_paths(self):
        """
        Base directory to strip paths before applying the partitioning.
        """
        return self.options.validate_column_chunk_paths

    @validate_column_chunk_paths.setter
    def validate_column_chunk_paths(self, value):
        self.options.validate_column_chunk_paths = value


cdef class ParquetDatasetFactory(DatasetFactory):
    """
    Create a ParquetDatasetFactory from a Parquet `_metadata` file.

    Parameters
    ----------
    metadata_path : str
        Path to the `_metadata` parquet metadata-only file generated with
        `pyarrow.parquet.write_metadata`.
    filesystem : pyarrow.fs.FileSystem
        Filesystem to read the metadata_path from, and subsequent parquet
        files.
    format : ParquetFileFormat
        Parquet format options.
    options : ParquetFactoryOptions, optional
        Various flags influencing the discovery of filesystem paths.
    """

    cdef:
        CParquetDatasetFactory* parquet_factory

    def __init__(self, metadata_path, FileSystem filesystem not None,
                 FileFormat format not None,
                 ParquetFactoryOptions options=None):
        cdef:
            c_string c_path
            shared_ptr[CFileSystem] c_filesystem
            shared_ptr[CParquetFileFormat] c_format
            CResult[shared_ptr[CDatasetFactory]] result
            CParquetFactoryOptions c_options

        c_path = tobytes(metadata_path)
        c_filesystem = filesystem.unwrap()
        c_format = static_pointer_cast[CParquetFileFormat, CFileFormat](
            format.unwrap())
        options = options or ParquetFactoryOptions()
        c_options = options.unwrap()

        with nogil:
            result = CParquetDatasetFactory.MakeFromMetaDataPath(
                c_path, c_filesystem, c_format, c_options)
        self.init(GetResultValue(result))

    cdef init(self, shared_ptr[CDatasetFactory]& sp):
        DatasetFactory.init(self, sp)
        self.parquet_factory = <CParquetDatasetFactory*> sp.get()