# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: language_level = 3
"""Dataset support for Parquet file format."""
from cython cimport binding
from cython.operator cimport dereference as deref
import os
import warnings
import pyarrow as pa
from pyarrow.lib cimport *
from pyarrow.lib import frombytes, tobytes, is_threading_enabled
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_dataset cimport *
from pyarrow.includes.libarrow_dataset_parquet cimport *
from pyarrow._fs cimport FileSystem
from pyarrow._compute cimport Expression, _bind
from pyarrow._dataset cimport (
_make_file_source,
DatasetFactory,
FileFormat,
FileFragment,
FileWriteOptions,
Fragment,
FragmentScanOptions,
CacheOptions,
Partitioning,
PartitioningFactory,
WrittenFile
)
from pyarrow._parquet cimport (
_create_writer_properties, _create_arrow_writer_properties,
FileMetaData,
)
try:
from pyarrow._dataset_parquet_encryption import (
set_encryption_config, set_decryption_config, set_decryption_properties
)
parquet_encryption_enabled = True
except ImportError:
parquet_encryption_enabled = False
cdef Expression _true = Expression._scalar(True)
ctypedef CParquetFileWriter* _CParquetFileWriterPtr
cdef class ParquetFileFormat(FileFormat):
"""
FileFormat for Parquet
Parameters
----------
read_options : ParquetReadOptions
Read options for the file.
default_fragment_scan_options : ParquetFragmentScanOptions
Scan Options for the file.
**kwargs : dict
Additional options for read option or scan option
"""
cdef:
CParquetFileFormat* parquet_format
def __init__(self, read_options=None,
default_fragment_scan_options=None,
**kwargs):
cdef:
shared_ptr[CParquetFileFormat] wrapped
CParquetFileFormatReaderOptions* options
# Read/scan options
read_options_args = {option: kwargs[option] for option in kwargs
if option in _PARQUET_READ_OPTIONS}
scan_args = {option: kwargs[option] for option in kwargs
if option not in _PARQUET_READ_OPTIONS}
if read_options and read_options_args:
duplicates = ', '.join(sorted(read_options_args))
raise ValueError(f'If `read_options` is given, '
f'cannot specify {duplicates}')
if default_fragment_scan_options and scan_args:
duplicates = ', '.join(sorted(scan_args))
raise ValueError(f'If `default_fragment_scan_options` is given, '
f'cannot specify {duplicates}')
if read_options is None:
read_options = ParquetReadOptions(**read_options_args)
elif isinstance(read_options, dict):
# For backwards compatibility
duplicates = []
for option, value in read_options.items():
if option in _PARQUET_READ_OPTIONS:
read_options_args[option] = value
else:
duplicates.append(option)
scan_args[option] = value
if duplicates:
duplicates = ", ".join(duplicates)
warnings.warn(f'The scan options {duplicates} should be '
'specified directly as keyword arguments')
read_options = ParquetReadOptions(**read_options_args)
elif not isinstance(read_options, ParquetReadOptions):
raise TypeError('`read_options` must be either a dictionary or an '
'instance of ParquetReadOptions')
if default_fragment_scan_options is None:
default_fragment_scan_options = ParquetFragmentScanOptions(**scan_args)
elif isinstance(default_fragment_scan_options, dict):
default_fragment_scan_options = ParquetFragmentScanOptions(
**default_fragment_scan_options)
elif not isinstance(default_fragment_scan_options,
ParquetFragmentScanOptions):
raise TypeError('`default_fragment_scan_options` must be either a '
'dictionary or an instance of '
'ParquetFragmentScanOptions')
wrapped = make_shared[CParquetFileFormat]()
options = &(wrapped.get().reader_options)
if read_options.dictionary_columns is not None:
for column in read_options.dictionary_columns:
options.dict_columns.insert(tobytes(column))
options.coerce_int96_timestamp_unit = \
read_options._coerce_int96_timestamp_unit
self.init(<shared_ptr[CFileFormat]> wrapped)
self.default_fragment_scan_options = default_fragment_scan_options
cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
self.parquet_format = <CParquetFileFormat*> sp.get()
cdef WrittenFile _finish_write(self, path, base_dir,
CFileWriter* file_writer):
cdef:
FileMetaData parquet_metadata
CParquetFileWriter* parquet_file_writer
parquet_metadata = None
parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer)
with nogil:
metadata = deref(
deref(parquet_file_writer).parquet_writer()).metadata()
if metadata:
parquet_metadata = FileMetaData()
parquet_metadata.init(metadata)
parquet_metadata.set_file_path(os.path.relpath(path, base_dir))
size = GetResultValue(file_writer.GetBytesWritten())
return WrittenFile(path, parquet_metadata, size)
@property
def read_options(self):
cdef CParquetFileFormatReaderOptions* options
options = &self.parquet_format.reader_options
parquet_read_options = ParquetReadOptions(
dictionary_columns={frombytes(col)
for col in options.dict_columns},
)
# Read options getter/setter works with strings so setting
# the private property which uses the C Type
parquet_read_options._coerce_int96_timestamp_unit = \
options.coerce_int96_timestamp_unit
return parquet_read_options
def make_write_options(self, **kwargs):
"""
Parameters
----------
**kwargs : dict
Returns
-------
pyarrow.dataset.FileWriteOptions
"""
# Safeguard from calling make_write_options as a static class method
if not isinstance(self, ParquetFileFormat):
raise TypeError("make_write_options() should be called on "
"an instance of ParquetFileFormat")
opts = FileFormat.make_write_options(self)
(<ParquetFileWriteOptions> opts).update(**kwargs)
return opts
cdef _set_default_fragment_scan_options(self, FragmentScanOptions options):
if options.type_name == 'parquet':
self.parquet_format.default_fragment_scan_options = options.wrapped
else:
super()._set_default_fragment_scan_options(options)
def equals(self, ParquetFileFormat other):
"""
Parameters
----------
other : pyarrow.dataset.ParquetFileFormat
Returns
-------
bool
"""
return (
self.read_options.equals(other.read_options) and
self.default_fragment_scan_options ==
other.default_fragment_scan_options
)
@property
def default_extname(self):
return "parquet"
def __reduce__(self):
return ParquetFileFormat, (self.read_options,
self.default_fragment_scan_options)
def __repr__(self):
return f"<ParquetFileFormat read_options={self.read_options}>"
def make_fragment(self, file, filesystem=None,
Expression partition_expression=None, row_groups=None, *, file_size=None):
"""
Make a FileFragment from a given file.
Parameters
----------
file : file-like object, path-like or str
The file or file path to make a fragment from.
filesystem : Filesystem, optional
If `filesystem` is given, `file` must be a string and specifies
the path of the file to read from the filesystem.
partition_expression : Expression, optional
An expression that is guaranteed true for all rows in the fragment. Allows
fragment to be potentially skipped while scanning with a filter.
row_groups : Iterable, optional
The indices of the row groups to include
file_size : int, optional
The size of the file in bytes. Can improve performance with high-latency filesystems
when file size needs to be known before reading.
Returns
-------
fragment : Fragment
The file fragment
"""
cdef:
vector[int] c_row_groups
if partition_expression is None:
partition_expression = _true
if row_groups is None:
return super().make_fragment(file, filesystem,
partition_expression, file_size=file_size)
c_source = _make_file_source(file, filesystem, file_size)
c_row_groups = [<int> row_group for row_group in set(row_groups)]
c_fragment = <shared_ptr[CFragment]> GetResultValue(
self.parquet_format.MakeFragment(move(c_source),
partition_expression.unwrap(),
<shared_ptr[CSchema]>nullptr,
move(c_row_groups)))
return Fragment.wrap(move(c_fragment))
class RowGroupInfo:
"""
A wrapper class for RowGroup information
Parameters
----------
id : integer
The group ID.
metadata : FileMetaData
The rowgroup metadata.
schema : Schema
Schema of the rows.
"""
def __init__(self, id, metadata, schema):
self.id = id
self.metadata = metadata
self.schema = schema
@property
def num_rows(self):
return self.metadata.num_rows
@property
def total_byte_size(self):
return self.metadata.total_byte_size
@property
def statistics(self):
def name_stats(i):
col = self.metadata.column(i)
stats = col.statistics
if stats is None or not stats.has_min_max:
return None, None
name = col.path_in_schema
field_index = self.schema.get_field_index(name)
if field_index < 0:
return None, None
typ = self.schema.field(field_index).type
return col.path_in_schema, {
'min': pa.scalar(stats.min, type=typ).as_py(),
'max': pa.scalar(stats.max, type=typ).as_py()
}
return {
name: stats for name, stats
in map(name_stats, range(self.metadata.num_columns))
if stats is not None
}
def __repr__(self):
return "RowGroupInfo({})".format(self.id)
def __eq__(self, other):
if isinstance(other, int):
return self.id == other
if not isinstance(other, RowGroupInfo):
Loading ...