Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev65 

/ _dataset_parquet.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: language_level = 3

"""Dataset support for Parquet file format."""

from cython cimport binding
from cython.operator cimport dereference as deref

import os
import warnings

import pyarrow as pa
from pyarrow.lib cimport *
from pyarrow.lib import frombytes, tobytes, is_threading_enabled
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.includes.libarrow_dataset_parquet cimport *
from pyarrow._fs cimport FileSystem

from pyarrow._compute cimport Expression, _bind
from pyarrow._dataset cimport (
    _make_file_source,
    DatasetFactory,
    FileFormat,
    FileFragment,
    FileWriteOptions,
    Fragment,
    FragmentScanOptions,
    CacheOptions,
    Partitioning,
    PartitioningFactory,
    WrittenFile
)

from pyarrow._parquet cimport (
    _create_writer_properties, _create_arrow_writer_properties,
    FileMetaData,
)


try:
    from pyarrow._dataset_parquet_encryption import (
        set_encryption_config, set_decryption_config, set_decryption_properties
    )
    parquet_encryption_enabled = True
except ImportError:
    parquet_encryption_enabled = False


cdef Expression _true = Expression._scalar(True)

ctypedef CParquetFileWriter* _CParquetFileWriterPtr


cdef class ParquetFileFormat(FileFormat):
    """
    FileFormat for Parquet

    Parameters
    ----------
    read_options : ParquetReadOptions
        Read options for the file.
    default_fragment_scan_options : ParquetFragmentScanOptions
        Scan Options for the file.
    **kwargs : dict
        Additional options for read option or scan option
    """

    cdef:
        CParquetFileFormat* parquet_format

    def __init__(self, read_options=None,
                 default_fragment_scan_options=None,
                 **kwargs):
        cdef:
            shared_ptr[CParquetFileFormat] wrapped
            CParquetFileFormatReaderOptions* options

        # Read/scan options
        read_options_args = {option: kwargs[option] for option in kwargs
                             if option in _PARQUET_READ_OPTIONS}
        scan_args = {option: kwargs[option] for option in kwargs
                     if option not in _PARQUET_READ_OPTIONS}
        if read_options and read_options_args:
            duplicates = ', '.join(sorted(read_options_args))
            raise ValueError(f'If `read_options` is given, '
                             f'cannot specify {duplicates}')
        if default_fragment_scan_options and scan_args:
            duplicates = ', '.join(sorted(scan_args))
            raise ValueError(f'If `default_fragment_scan_options` is given, '
                             f'cannot specify {duplicates}')

        if read_options is None:
            read_options = ParquetReadOptions(**read_options_args)
        elif isinstance(read_options, dict):
            # For backwards compatibility
            duplicates = []
            for option, value in read_options.items():
                if option in _PARQUET_READ_OPTIONS:
                    read_options_args[option] = value
                else:
                    duplicates.append(option)
                    scan_args[option] = value
            if duplicates:
                duplicates = ", ".join(duplicates)
                warnings.warn(f'The scan options {duplicates} should be '
                              'specified directly as keyword arguments')
            read_options = ParquetReadOptions(**read_options_args)
        elif not isinstance(read_options, ParquetReadOptions):
            raise TypeError('`read_options` must be either a dictionary or an '
                            'instance of ParquetReadOptions')

        if default_fragment_scan_options is None:
            default_fragment_scan_options = ParquetFragmentScanOptions(**scan_args)
        elif isinstance(default_fragment_scan_options, dict):
            default_fragment_scan_options = ParquetFragmentScanOptions(
                **default_fragment_scan_options)
        elif not isinstance(default_fragment_scan_options,
                            ParquetFragmentScanOptions):
            raise TypeError('`default_fragment_scan_options` must be either a '
                            'dictionary or an instance of '
                            'ParquetFragmentScanOptions')

        wrapped = make_shared[CParquetFileFormat]()

        options = &(wrapped.get().reader_options)
        if read_options.dictionary_columns is not None:
            for column in read_options.dictionary_columns:
                options.dict_columns.insert(tobytes(column))
        options.coerce_int96_timestamp_unit = \
            read_options._coerce_int96_timestamp_unit

        self.init(<shared_ptr[CFileFormat]> wrapped)
        self.default_fragment_scan_options = default_fragment_scan_options

    cdef void init(self, const shared_ptr[CFileFormat]& sp):
        FileFormat.init(self, sp)
        self.parquet_format = <CParquetFileFormat*> sp.get()

    cdef WrittenFile _finish_write(self, path, base_dir,
                                   CFileWriter* file_writer):
        cdef:
            FileMetaData parquet_metadata
            CParquetFileWriter* parquet_file_writer

        parquet_metadata = None
        parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer)
        with nogil:
            metadata = deref(
                deref(parquet_file_writer).parquet_writer()).metadata()
        if metadata:
            parquet_metadata = FileMetaData()
            parquet_metadata.init(metadata)
            parquet_metadata.set_file_path(os.path.relpath(path, base_dir))

        size = GetResultValue(file_writer.GetBytesWritten())

        return WrittenFile(path, parquet_metadata, size)

    @property
    def read_options(self):
        cdef CParquetFileFormatReaderOptions* options
        options = &self.parquet_format.reader_options
        parquet_read_options = ParquetReadOptions(
            dictionary_columns={frombytes(col)
                                for col in options.dict_columns},
        )
        # Read options getter/setter works with strings so setting
        # the private property which uses the C Type
        parquet_read_options._coerce_int96_timestamp_unit = \
            options.coerce_int96_timestamp_unit
        return parquet_read_options

    def make_write_options(self, **kwargs):
        """
        Parameters
        ----------
        **kwargs : dict

        Returns
        -------
        pyarrow.dataset.FileWriteOptions
        """
        # Safeguard from calling make_write_options as a static class method
        if not isinstance(self, ParquetFileFormat):
            raise TypeError("make_write_options() should be called on "
                            "an instance of ParquetFileFormat")
        opts = FileFormat.make_write_options(self)
        (<ParquetFileWriteOptions> opts).update(**kwargs)
        return opts

    cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
        if options.type_name == 'parquet':
            self.parquet_format.default_fragment_scan_options = options.wrapped
        else:
            super()._set_default_fragment_scan_options(options)

    def equals(self, ParquetFileFormat other):
        """
        Parameters
        ----------
        other : pyarrow.dataset.ParquetFileFormat

        Returns
        -------
        bool
        """
        return (
            self.read_options.equals(other.read_options) and
            self.default_fragment_scan_options ==
            other.default_fragment_scan_options
        )

    @property
    def default_extname(self):
        return "parquet"

    def __reduce__(self):
        return ParquetFileFormat, (self.read_options,
                                   self.default_fragment_scan_options)

    def __repr__(self):
        return f"<ParquetFileFormat read_options={self.read_options}>"

    def make_fragment(self, file, filesystem=None,
                      Expression partition_expression=None, row_groups=None, *, file_size=None):
        """
        Make a FileFragment from a given file.

        Parameters
        ----------
        file : file-like object, path-like or str
            The file or file path to make a fragment from.
        filesystem : Filesystem, optional
            If `filesystem` is given, `file` must be a string and specifies
            the path of the file to read from the filesystem.
        partition_expression : Expression, optional
            An expression that is guaranteed true for all rows in the fragment.  Allows
            fragment to be potentially skipped while scanning with a filter.
        row_groups : Iterable, optional
            The indices of the row groups to include
        file_size : int, optional
            The size of the file in bytes. Can improve performance with high-latency filesystems
            when file size needs to be known before reading.

        Returns
        -------
        fragment : Fragment
            The file fragment
        """
        cdef:
            vector[int] c_row_groups
        if partition_expression is None:
            partition_expression = _true
        if row_groups is None:
            return super().make_fragment(file, filesystem,
                                         partition_expression, file_size=file_size)

        c_source = _make_file_source(file, filesystem, file_size)
        c_row_groups = [<int> row_group for row_group in set(row_groups)]

        c_fragment = <shared_ptr[CFragment]> GetResultValue(
            self.parquet_format.MakeFragment(move(c_source),
                                             partition_expression.unwrap(),
                                             <shared_ptr[CSchema]>nullptr,
                                             move(c_row_groups)))
        return Fragment.wrap(move(c_fragment))


class RowGroupInfo:
    """
    A wrapper class for RowGroup information

    Parameters
    ----------
    id : integer
        The group ID.
    metadata : FileMetaData
        The rowgroup metadata.
    schema : Schema
        Schema of the rows.
    """

    def __init__(self, id, metadata, schema):
        self.id = id
        self.metadata = metadata
        self.schema = schema

    @property
    def num_rows(self):
        return self.metadata.num_rows

    @property
    def total_byte_size(self):
        return self.metadata.total_byte_size

    @property
    def statistics(self):
        def name_stats(i):
            col = self.metadata.column(i)

            stats = col.statistics
            if stats is None or not stats.has_min_max:
                return None, None

            name = col.path_in_schema
            field_index = self.schema.get_field_index(name)
            if field_index < 0:
                return None, None

            typ = self.schema.field(field_index).type
            return col.path_in_schema, {
                'min': pa.scalar(stats.min, type=typ).as_py(),
                'max': pa.scalar(stats.max, type=typ).as_py()
            }

        return {
            name: stats for name, stats
            in map(name_stats, range(self.metadata.num_columns))
            if stats is not None
        }

    def __repr__(self):
        return "RowGroupInfo({})".format(self.id)

    def __eq__(self, other):
        if isinstance(other, int):
            return self.id == other
        if not isinstance(other, RowGroupInfo):
Loading ...