# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
from collections.abc import Sequence
from textwrap import indent
import warnings
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table, KeyValueMetadata,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
pyarrow_unwrap_metadata,
pyarrow_unwrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_batch,
pyarrow_wrap_scalar,
NativeFile, get_reader, get_writer,
string_to_timeunit)
from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
_stringify_path,
tobytes, frombytes, is_threading_enabled)
cimport cpython as cp
_DEFAULT_ROW_GROUP_SIZE = 1024*1024
_MAX_ROW_GROUP_SIZE = 64*1024*1024
cdef class Statistics(_Weakrefable):
"""Statistics for a single column in a single row group."""
def __cinit__(self):
pass
def __repr__(self):
return """{}
has_min_max: {}
min: {}
max: {}
null_count: {}
distinct_count: {}
num_values: {}
physical_type: {}
logical_type: {}
converted_type (legacy): {}""".format(object.__repr__(self),
self.has_min_max,
self.min,
self.max,
self.null_count,
self.distinct_count,
self.num_values,
self.physical_type,
str(self.logical_type),
self.converted_type)
def to_dict(self):
"""
Get dictionary representation of statistics.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
d = dict(
has_min_max=self.has_min_max,
min=self.min,
max=self.max,
null_count=self.null_count,
distinct_count=self.distinct_count,
num_values=self.num_values,
physical_type=self.physical_type
)
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, Statistics other):
"""
Return whether the two column statistics objects are equal.
Parameters
----------
other : Statistics
Statistics to compare against.
Returns
-------
are_equal : bool
"""
return self.statistics.get().Equals(deref(other.statistics.get()))
@property
def has_min_max(self):
"""Whether min and max are present (bool)."""
return self.statistics.get().HasMinMax()
@property
def has_null_count(self):
"""Whether null count is present (bool)."""
return self.statistics.get().HasNullCount()
@property
def has_distinct_count(self):
"""Whether distinct count is preset (bool)."""
return self.statistics.get().HasDistinctCount()
@property
def min_raw(self):
"""Min value as physical type (bool, int, float, or bytes)."""
if self.has_min_max:
return _cast_statistic_raw_min(self.statistics.get())
else:
return None
@property
def max_raw(self):
"""Max value as physical type (bool, int, float, or bytes)."""
if self.has_min_max:
return _cast_statistic_raw_max(self.statistics.get())
else:
return None
@property
def min(self):
"""
Min value as logical type.
Returned as the Python equivalent of logical type, such as datetime.date
for dates and decimal.Decimal for decimals.
"""
if self.has_min_max:
min_scalar, _ = _cast_statistics(self.statistics.get())
return min_scalar.as_py()
else:
return None
@property
def max(self):
"""
Max value as logical type.
Returned as the Python equivalent of logical type, such as datetime.date
for dates and decimal.Decimal for decimals.
"""
if self.has_min_max:
_, max_scalar = _cast_statistics(self.statistics.get())
return max_scalar.as_py()
else:
return None
@property
def null_count(self):
"""Number of null values in chunk (int)."""
if self.has_null_count:
return self.statistics.get().null_count()
else:
return None
@property
def distinct_count(self):
"""Distinct number of values in chunk (int)."""
if self.has_distinct_count:
return self.statistics.get().distinct_count()
else:
return None
@property
def num_values(self):
"""Number of non-null values (int)."""
return self.statistics.get().num_values()
@property
def physical_type(self):
"""Physical type of column (str)."""
raw_physical_type = self.statistics.get().physical_type()
return physical_type_name_from_enum(raw_physical_type)
@property
def logical_type(self):
"""Logical type of column (:class:`ParquetLogicalType`)."""
return wrap_logical_type(self.statistics.get().descr().logical_type())
@property
def converted_type(self):
"""Legacy converted type (str or None)."""
raw_converted_type = self.statistics.get().descr().converted_type()
return converted_type_name_from_enum(raw_converted_type)
cdef class ParquetLogicalType(_Weakrefable):
"""Logical type of parquet type."""
cdef:
shared_ptr[const CParquetLogicalType] type
def __cinit__(self):
pass
cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
self.type = type
def __repr__(self):
return "{}\n {}".format(object.__repr__(self), str(self))
def __str__(self):
return frombytes(self.type.get().ToString(), safe=True)
def to_json(self):
"""
Get a JSON string containing type and type parameters.
Returns
-------
json : str
JSON representation of type, with at least a field called 'Type'
which contains the type name. If the type is parameterized, such
as a decimal with scale and precision, will contain those as fields
as well.
"""
return frombytes(self.type.get().ToJSON())
@property
def type(self):
"""Name of the logical type (str)."""
return logical_type_name_from_enum(self.type.get().type())
cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
cdef ParquetLogicalType out = ParquetLogicalType()
out.init(type)
return out
cdef _cast_statistic_raw_min(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).min()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).min()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).min()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).min()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).min()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).min())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)
cdef _cast_statistic_raw_max(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).max()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).max()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).max()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).max()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).max()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).max())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)
cdef _cast_statistics(CStatistics* statistics):
cdef:
shared_ptr[CScalar] c_min
shared_ptr[CScalar] c_max
check_status(StatisticsAsScalars(statistics[0], &c_min, &c_max))
return (pyarrow_wrap_scalar(c_min), pyarrow_wrap_scalar(c_max))
cdef _box_byte_array(ParquetByteArray val):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)
cdef _box_flba(ParquetFLBA val, uint32_t len):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
cdef class ColumnChunkMetaData(_Weakrefable):
"""Column metadata for a single row group."""
def __cinit__(self):
pass
def __repr__(self):
statistics = indent(repr(self.statistics), 4 * ' ')
return """{0}
file_offset: {1}
file_path: {2}
physical_type: {3}
num_values: {4}
path_in_schema: {5}
is_stats_set: {6}
statistics:
{7}
compression: {8}
encodings: {9}
has_dictionary_page: {10}
dictionary_page_offset: {11}
data_page_offset: {12}
total_compressed_size: {13}
total_uncompressed_size: {14}""".format(object.__repr__(self),
self.file_offset,
self.file_path,
self.physical_type,
self.num_values,
self.path_in_schema,
self.is_stats_set,
statistics,
self.compression,
self.encodings,
self.has_dictionary_page,
self.dictionary_page_offset,
self.data_page_offset,
self.total_compressed_size,
self.total_uncompressed_size)
def to_dict(self):
"""
Get dictionary representation of the column chunk metadata.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
statistics = self.statistics.to_dict() if self.is_stats_set else None
d = dict(
file_offset=self.file_offset,
file_path=self.file_path,
physical_type=self.physical_type,
num_values=self.num_values,
path_in_schema=self.path_in_schema,
is_stats_set=self.is_stats_set,
statistics=statistics,
compression=self.compression,
encodings=self.encodings,
has_dictionary_page=self.has_dictionary_page,
dictionary_page_offset=self.dictionary_page_offset,
data_page_offset=self.data_page_offset,
total_compressed_size=self.total_compressed_size,
total_uncompressed_size=self.total_uncompressed_size
)
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, ColumnChunkMetaData other):
"""
Return whether the two column chunk metadata objects are equal.
Parameters
----------
other : ColumnChunkMetaData
Metadata to compare against.
Returns
-------
are_equal : bool
"""
return self.metadata.Equals(deref(other.metadata))
@property
def file_offset(self):
"""Offset into file where column chunk is located (int)."""
return self.metadata.file_offset()
@property
def file_path(self):
"""Optional file path if set (str or None)."""
return frombytes(self.metadata.file_path())
@property
def physical_type(self):
"""Physical type of column (str)."""
return physical_type_name_from_enum(self.metadata.type())
@property
def num_values(self):
"""Total number of values (int)."""
return self.metadata.num_values()
@property
def path_in_schema(self):
"""Nested path to field, separated by periods (str)."""
path = self.metadata.path_in_schema().get().ToDotString()
return frombytes(path)
@property
def is_stats_set(self):
"""Whether or not statistics are present in metadata (bool)."""
return self.metadata.is_stats_set()
@property
def statistics(self):
"""Statistics for column chunk (:class:`Statistics`)."""
if not self.metadata.is_stats_set():
return None
statistics = Statistics()
statistics.init(self.metadata.statistics(), self)
return statistics
@property
def compression(self):
"""
Type of compression used for column (str).
One of 'UNCOMPRESSED', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4', 'ZSTD',
or 'UNKNOWN'.
"""
return compression_name_from_enum(self.metadata.compression())
@property
def encodings(self):
"""
Encodings used for column (tuple of str).
One of 'PLAIN', 'BIT_PACKED', 'RLE', 'BYTE_STREAM_SPLIT', 'DELTA_BINARY_PACKED',
'DELTA_LENGTH_BYTE_ARRAY', 'DELTA_BYTE_ARRAY'.
"""
return tuple(map(encoding_name_from_enum, self.metadata.encodings()))
@property
def has_dictionary_page(self):
"""Whether there is dictionary data present in the column chunk (bool)."""
return bool(self.metadata.has_dictionary_page())
@property
def dictionary_page_offset(self):
"""Offset of dictionary page relative to beginning of the file (int)."""
if self.has_dictionary_page:
return self.metadata.dictionary_page_offset()
else:
return None
@property
def data_page_offset(self):
"""Offset of data page relative to beginning of the file (int)."""
return self.metadata.data_page_offset()
@property
def has_index_page(self):
"""Not yet supported."""
raise NotImplementedError('not supported in parquet-cpp')
@property
def index_page_offset(self):
"""Not yet supported."""
raise NotImplementedError("parquet-cpp doesn't return valid values")
@property
def total_compressed_size(self):
"""Compressed size in bytes (int)."""
return self.metadata.total_compressed_size()
@property
def total_uncompressed_size(self):
"""Uncompressed size in bytes (int)."""
return self.metadata.total_uncompressed_size()
@property
def has_offset_index(self):
"""Whether the column chunk has an offset index"""
return self.metadata.GetOffsetIndexLocation().has_value()
@property
def has_column_index(self):
"""Whether the column chunk has a column index"""
return self.metadata.GetColumnIndexLocation().has_value()
@property
def metadata(self):
"""Additional metadata as key value pairs (dict[bytes, bytes])."""
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self.metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None
cdef class SortingColumn:
"""
Sorting specification for a single column.
Returned by :meth:`RowGroupMetaData.sorting_columns` and used in
:class:`ParquetWriter` to specify the sort order of the data.
Parameters
----------
column_index : int
Index of column that data is sorted by.
descending : bool, default False
Whether column is sorted in descending order.
nulls_first : bool, default False
Whether null values appear before valid values.
Notes
-----
Column indices are zero-based, refer only to leaf fields, and are in
depth-first order. This may make the column indices for nested schemas
different from what you expect. In most cases, it will be easier to
specify the sort order using column names instead of column indices
and converting using the ``from_ordering`` method.
Examples
--------
In other APIs, sort order is specified by names, such as:
>>> sort_order = [('id', 'ascending'), ('timestamp', 'descending')]
For Parquet, the column index must be used instead:
>>> import pyarrow.parquet as pq
>>> [pq.SortingColumn(0), pq.SortingColumn(1, descending=True)]
[SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False)]
Convert the sort_order into the list of sorting columns with
``from_ordering`` (note that the schema must be provided as well):
>>> import pyarrow as pa
>>> schema = pa.schema([('id', pa.int64()), ('timestamp', pa.timestamp('ms'))])
>>> sorting_columns = pq.SortingColumn.from_ordering(schema, sort_order)
>>> sorting_columns
(SortingColumn(column_index=0, descending=False, nulls_first=False), SortingColumn(column_index=1, descending=True, nulls_first=False))
Convert back to the sort order with ``to_ordering``:
>>> pq.SortingColumn.to_ordering(schema, sorting_columns)
((('id', 'ascending'), ('timestamp', 'descending')), 'at_end')
See Also
--------
RowGroupMetaData.sorting_columns
"""
cdef int column_index
cdef c_bool descending
cdef c_bool nulls_first
def __init__(self, int column_index, c_bool descending=False, c_bool nulls_first=False):
self.column_index = column_index
self.descending = descending
self.nulls_first = nulls_first
@classmethod
def from_ordering(cls, Schema schema, sort_keys, null_placement='at_end'):
"""
Create a tuple of SortingColumn objects from the same arguments as
:class:`pyarrow.compute.SortOptions`.
Parameters
----------
schema : Schema
Schema of the input data.
sort_keys : Sequence of (name, order) tuples
Names of field/column keys (str) to sort the input on,
along with the order each field/column is sorted in.
Accepted values for `order` are "ascending", "descending".
null_placement : {'at_start', 'at_end'}, default 'at_end'
Where null values should appear in the sort order.
Returns
-------
sorting_columns : tuple of SortingColumn
"""
if null_placement == 'at_start':
nulls_first = True
elif null_placement == 'at_end':
nulls_first = False
else:
raise ValueError('null_placement must be "at_start" or "at_end"')
col_map = _name_to_index_map(schema)
sorting_columns = []
for sort_key in sort_keys:
if isinstance(sort_key, str):
name = sort_key
descending = False
elif (isinstance(sort_key, tuple) and len(sort_key) == 2 and
isinstance(sort_key[0], str) and
isinstance(sort_key[1], str)):
name, descending = sort_key
if descending == "descending":
descending = True
elif descending == "ascending":
descending = False
else:
raise ValueError("Invalid sort key direction: {0}"
.format(descending))
else:
raise ValueError("Invalid sort key: {0}".format(sort_key))
try:
column_index = col_map[name]
except KeyError:
raise ValueError("Sort key name '{0}' not found in schema:\n{1}"
.format(name, schema))
sorting_columns.append(
cls(column_index, descending=descending, nulls_first=nulls_first)
)
return tuple(sorting_columns)
@staticmethod
def to_ordering(Schema schema, sorting_columns):
"""
Convert a tuple of SortingColumn objects to the same format as
:class:`pyarrow.compute.SortOptions`.
Parameters
----------
schema : Schema
Schema of the input data.
sorting_columns : tuple of SortingColumn
Columns to sort the input on.
Returns
-------
sort_keys : tuple of (name, order) tuples
null_placement : {'at_start', 'at_end'}
"""
col_map = {i: name for name, i in _name_to_index_map(schema).items()}
sort_keys = []
nulls_first = None
for sorting_column in sorting_columns:
name = col_map[sorting_column.column_index]
if sorting_column.descending:
order = "descending"
else:
order = "ascending"
sort_keys.append((name, order))
if nulls_first is None:
nulls_first = sorting_column.nulls_first
elif nulls_first != sorting_column.nulls_first:
raise ValueError("Sorting columns have inconsistent null placement")
if nulls_first:
null_placement = "at_start"
else:
null_placement = "at_end"
return tuple(sort_keys), null_placement
def __repr__(self):
return """{}(column_index={}, descending={}, nulls_first={})""".format(
self.__class__.__name__,
self.column_index, self.descending, self.nulls_first)
def __eq__(self, SortingColumn other):
return (self.column_index == other.column_index and
self.descending == other.descending and
self.nulls_first == other.nulls_first)
def __hash__(self):
return hash((self.column_index, self.descending, self.nulls_first))
@property
def column_index(self):
""""Index of column data is sorted by (int)."""
return self.column_index
@property
def descending(self):
"""Whether column is sorted in descending order (bool)."""
return self.descending
@property
def nulls_first(self):
"""Whether null values appear before valid values (bool)."""
return self.nulls_first
def to_dict(self):
"""
Get dictionary representation of the SortingColumn.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
d = dict(
column_index=self.column_index,
descending=self.descending,
nulls_first=self.nulls_first
)
return d
cdef class RowGroupMetaData(_Weakrefable):
"""Metadata for a single row group."""
def __cinit__(self, FileMetaData parent, int index):
if index < 0 or index >= parent.num_row_groups:
raise IndexError('{0} out of bounds'.format(index))
self.up_metadata = parent._metadata.RowGroup(index)
self.metadata = self.up_metadata.get()
self.parent = parent
self.index = index
def __reduce__(self):
return RowGroupMetaData, (self.parent, self.index)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, RowGroupMetaData other):
"""
Return whether the two row group metadata objects are equal.
Parameters
----------
other : RowGroupMetaData
Metadata to compare against.
Returns
-------
are_equal : bool
"""
return self.metadata.Equals(deref(other.metadata))
def column(self, int i):
"""
Get column metadata at given index.
Parameters
----------
i : int
Index of column to get metadata for.
Returns
-------
ColumnChunkMetaData
Metadata for column within this chunk.
"""
if i < 0 or i >= self.num_columns:
raise IndexError('{0} out of bounds'.format(i))
chunk = ColumnChunkMetaData()
chunk.init(self, i)
return chunk
def __repr__(self):
return """{0}
num_columns: {1}
num_rows: {2}
total_byte_size: {3}
sorting_columns: {4}""".format(object.__repr__(self),
self.num_columns,
self.num_rows,
self.total_byte_size,
self.sorting_columns)
def to_dict(self):
"""
Get dictionary representation of the row group metadata.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
columns = []
d = dict(
num_columns=self.num_columns,
num_rows=self.num_rows,
total_byte_size=self.total_byte_size,
columns=columns,
sorting_columns=[col.to_dict() for col in self.sorting_columns]
)
for i in range(self.num_columns):
columns.append(self.column(i).to_dict())
return d
@property
def num_columns(self):
"""Number of columns in this row group (int)."""
return self.metadata.num_columns()
@property
def num_rows(self):
"""Number of rows in this row group (int)."""
return self.metadata.num_rows()
@property
def total_byte_size(self):
"""Total byte size of all the uncompressed column data in this row group (int)."""
return self.metadata.total_byte_size()
@property
def sorting_columns(self):
"""Columns the row group is sorted by (tuple of :class:`SortingColumn`))."""
out = []
cdef vector[CSortingColumn] sorting_columns = self.metadata.sorting_columns()
for sorting_col in sorting_columns:
out.append(SortingColumn(
sorting_col.column_idx,
sorting_col.descending,
sorting_col.nulls_first
))
return tuple(out)
def _reconstruct_filemetadata(Buffer serialized):
cdef:
FileMetaData metadata = FileMetaData.__new__(FileMetaData)
CBuffer *buffer = serialized.buffer.get()
uint32_t metadata_len = <uint32_t>buffer.size()
metadata.init(CFileMetaData_Make(buffer.data(), &metadata_len))
return metadata
cdef class FileMetaData(_Weakrefable):
"""Parquet metadata for a single file."""
def __cinit__(self):
pass
def __reduce__(self):
cdef:
NativeFile sink = BufferOutputStream()
COutputStream* c_sink = sink.get_output_stream().get()
with nogil:
self._metadata.WriteTo(c_sink)
cdef Buffer buffer = sink.getvalue()
return _reconstruct_filemetadata, (buffer,)
def __hash__(self):
return hash((self.schema,
self.num_rows,
self.num_row_groups,
self.format_version,
self.serialized_size))
def __repr__(self):
return """{0}
created_by: {1}
num_columns: {2}
num_rows: {3}
num_row_groups: {4}
format_version: {5}
serialized_size: {6}""".format(object.__repr__(self),
self.created_by, self.num_columns,
self.num_rows, self.num_row_groups,
self.format_version,
self.serialized_size)
def to_dict(self):
"""
Get dictionary representation of the file metadata.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
row_groups = []
d = dict(
created_by=self.created_by,
num_columns=self.num_columns,
num_rows=self.num_rows,
num_row_groups=self.num_row_groups,
row_groups=row_groups,
format_version=self.format_version,
serialized_size=self.serialized_size
)
for i in range(self.num_row_groups):
row_groups.append(self.row_group(i).to_dict())
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, FileMetaData other not None):
"""
Return whether the two file metadata objects are equal.
Parameters
----------
other : FileMetaData
Metadata to compare against.
Returns
-------
are_equal : bool
"""
return self._metadata.Equals(deref(other._metadata))
@property
def schema(self):
"""Schema of the file (:class:`ParquetSchema`)."""
if self._schema is None:
self._schema = ParquetSchema(self)
return self._schema
@property
def serialized_size(self):
"""Size of the original thrift encoded metadata footer (int)."""
return self._metadata.size()
@property
def num_columns(self):
"""Number of columns in file (int)."""
return self._metadata.num_columns()
@property
def num_rows(self):
"""Total number of rows in file (int)."""
return self._metadata.num_rows()
@property
def num_row_groups(self):
"""Number of row groups in file (int)."""
return self._metadata.num_row_groups()
@property
def format_version(self):
"""
Parquet format version used in file (str, such as '1.0', '2.4').
If version is missing or unparsable, will default to assuming '2.6'.
"""
cdef ParquetVersion version = self._metadata.version()
if version == ParquetVersion_V1:
return '1.0'
elif version == ParquetVersion_V2_0:
return 'pseudo-2.0'
elif version == ParquetVersion_V2_4:
return '2.4'
elif version == ParquetVersion_V2_6:
return '2.6'
else:
warnings.warn('Unrecognized file version, assuming 2.6: {}'
.format(version))
return '2.6'
@property
def created_by(self):
"""
String describing source of the parquet file (str).
This typically includes library name and version number. For example, Arrow 7.0's
writer returns 'parquet-cpp-arrow version 7.0.0'.
"""
return frombytes(self._metadata.created_by())
@property
def metadata(self):
"""Additional metadata as key value pairs (dict[bytes, bytes])."""
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self._metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None
def row_group(self, int i):
"""
Get metadata for row group at index i.
Parameters
----------
i : int
Row group index to get.
Returns
-------
row_group_metadata : RowGroupMetaData
"""
return RowGroupMetaData(self, i)
def set_file_path(self, path):
"""
Set ColumnChunk file paths to the given value.
This method modifies the ``file_path`` field of each ColumnChunk
in the FileMetaData to be a particular value.
Parameters
----------
path : str
The file path to set on all ColumnChunks.
"""
cdef:
c_string c_path = tobytes(path)
self._metadata.set_file_path(c_path)
def append_row_groups(self, FileMetaData other):
"""
Append row groups from other FileMetaData object.
Parameters
----------
other : FileMetaData
Other metadata to append row groups from.
"""
cdef shared_ptr[CFileMetaData] c_metadata
c_metadata = other.sp_metadata
self._metadata.AppendRowGroups(deref(c_metadata))
def write_metadata_file(self, where):
"""
Write the metadata to a metadata-only Parquet file.
Parameters
----------
where : path or file-like object
Where to write the metadata. Should be a writable path on
the local filesystem, or a writable file-like object.
"""
cdef:
shared_ptr[COutputStream] sink
c_string c_where
try:
where = _stringify_path(where)
except TypeError:
get_writer(where, &sink)
else:
c_where = tobytes(where)
with nogil:
sink = GetResultValue(FileOutputStream.Open(c_where))
with nogil:
check_status(
WriteMetaDataFile(deref(self._metadata), sink.get()))
cdef class ParquetSchema(_Weakrefable):
"""A Parquet schema."""
def __cinit__(self, FileMetaData container):
self.parent = container
self.schema = container._metadata.schema()
def __repr__(self):
return "{0}\n{1}".format(
object.__repr__(self),
frombytes(self.schema.ToString(), safe=True))
def __reduce__(self):
return ParquetSchema, (self.parent,)
def __len__(self):
return self.schema.num_columns()
def __getitem__(self, i):
return self.column(i)
def __hash__(self):
return hash(self.schema.ToString())
@property
def names(self):
"""Name of each field (list of str)."""
return [self[i].name for i in range(len(self))]
def to_arrow_schema(self):
"""
Convert Parquet schema to effective Arrow schema.
Returns
-------
schema : Schema
"""
cdef shared_ptr[CSchema] sp_arrow_schema
with nogil:
check_status(FromParquetSchema(
self.schema, default_arrow_reader_properties(),
self.parent._metadata.key_value_metadata(),
&sp_arrow_schema))
return pyarrow_wrap_schema(sp_arrow_schema)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, ParquetSchema other):
"""
Return whether the two schemas are equal.
Parameters
----------
other : ParquetSchema
Schema to compare against.
Returns
-------
are_equal : bool
"""
return self.schema.Equals(deref(other.schema))
def column(self, i):
"""
Return the schema for a single column.
Parameters
----------
i : int
Index of column in schema.
Returns
-------
column_schema : ColumnSchema
"""
if i < 0 or i >= len(self):
raise IndexError('{0} out of bounds'.format(i))
return ColumnSchema(self, i)
cdef class ColumnSchema(_Weakrefable):
"""Schema for a single column."""
cdef:
int index
ParquetSchema parent
const ColumnDescriptor* descr
def __cinit__(self, ParquetSchema schema, int index):
self.parent = schema
self.index = index # for pickling support
self.descr = schema.schema.Column(index)
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def __reduce__(self):
return ColumnSchema, (self.parent, self.index)
def equals(self, ColumnSchema other):
"""
Return whether the two column schemas are equal.
Parameters
----------
other : ColumnSchema
Schema to compare against.
Returns
-------
are_equal : bool
"""
return self.descr.Equals(deref(other.descr))
def __repr__(self):
physical_type = self.physical_type
converted_type = self.converted_type
if converted_type == 'DECIMAL':
converted_type = 'DECIMAL({0}, {1})'.format(self.precision,
self.scale)
elif physical_type == 'FIXED_LEN_BYTE_ARRAY':
converted_type = ('FIXED_LEN_BYTE_ARRAY(length={0})'
.format(self.length))
return """<ParquetColumnSchema>
name: {0}
path: {1}
max_definition_level: {2}
max_repetition_level: {3}
physical_type: {4}
logical_type: {5}
converted_type (legacy): {6}""".format(self.name, self.path,
self.max_definition_level,
self.max_repetition_level,
physical_type,
str(self.logical_type),
converted_type)
@property
def name(self):
"""Name of field (str)."""
return frombytes(self.descr.name())
@property
def path(self):
"""Nested path to field, separated by periods (str)."""
return frombytes(self.descr.path().get().ToDotString())
@property
def max_definition_level(self):
"""Maximum definition level (int)."""
return self.descr.max_definition_level()
@property
def max_repetition_level(self):
"""Maximum repetition level (int)."""
return self.descr.max_repetition_level()
@property
def physical_type(self):
"""Name of physical type (str)."""
return physical_type_name_from_enum(self.descr.physical_type())
@property
def logical_type(self):
"""Logical type of column (:class:`ParquetLogicalType`)."""
return wrap_logical_type(self.descr.logical_type())
@property
def converted_type(self):
"""Legacy converted type (str or None)."""
return converted_type_name_from_enum(self.descr.converted_type())
# FIXED_LEN_BYTE_ARRAY attribute
@property
def length(self):
"""Array length if fixed length byte array type, None otherwise (int or None)."""
return self.descr.type_length()
# Decimal attributes
@property
def precision(self):
"""Precision if decimal type, None otherwise (int or None)."""
return self.descr.type_precision()
@property
def scale(self):
"""Scale if decimal type, None otherwise (int or None)."""
return self.descr.type_scale()
cdef physical_type_name_from_enum(ParquetType type_):
return {
ParquetType_BOOLEAN: 'BOOLEAN',
ParquetType_INT32: 'INT32',
ParquetType_INT64: 'INT64',
ParquetType_INT96: 'INT96',
ParquetType_FLOAT: 'FLOAT',
ParquetType_DOUBLE: 'DOUBLE',
ParquetType_BYTE_ARRAY: 'BYTE_ARRAY',
ParquetType_FIXED_LEN_BYTE_ARRAY: 'FIXED_LEN_BYTE_ARRAY',
}.get(type_, 'UNKNOWN')
cdef logical_type_name_from_enum(ParquetLogicalTypeId type_):
return {
ParquetLogicalType_UNDEFINED: 'UNDEFINED',
ParquetLogicalType_STRING: 'STRING',
ParquetLogicalType_MAP: 'MAP',
ParquetLogicalType_LIST: 'LIST',
ParquetLogicalType_ENUM: 'ENUM',
ParquetLogicalType_DECIMAL: 'DECIMAL',
ParquetLogicalType_DATE: 'DATE',
ParquetLogicalType_TIME: 'TIME',
ParquetLogicalType_TIMESTAMP: 'TIMESTAMP',
ParquetLogicalType_INT: 'INT',
ParquetLogicalType_FLOAT16: 'FLOAT16',
ParquetLogicalType_JSON: 'JSON',
ParquetLogicalType_BSON: 'BSON',
ParquetLogicalType_UUID: 'UUID',
ParquetLogicalType_NONE: 'NONE',
}.get(type_, 'UNKNOWN')
cdef converted_type_name_from_enum(ParquetConvertedType type_):
return {
ParquetConvertedType_NONE: 'NONE',
ParquetConvertedType_UTF8: 'UTF8',
ParquetConvertedType_MAP: 'MAP',
ParquetConvertedType_MAP_KEY_VALUE: 'MAP_KEY_VALUE',
ParquetConvertedType_LIST: 'LIST',
ParquetConvertedType_ENUM: 'ENUM',
ParquetConvertedType_DECIMAL: 'DECIMAL',
ParquetConvertedType_DATE: 'DATE',
ParquetConvertedType_TIME_MILLIS: 'TIME_MILLIS',
ParquetConvertedType_TIME_MICROS: 'TIME_MICROS',
ParquetConvertedType_TIMESTAMP_MILLIS: 'TIMESTAMP_MILLIS',
ParquetConvertedType_TIMESTAMP_MICROS: 'TIMESTAMP_MICROS',
ParquetConvertedType_UINT_8: 'UINT_8',
ParquetConvertedType_UINT_16: 'UINT_16',
ParquetConvertedType_UINT_32: 'UINT_32',
ParquetConvertedType_UINT_64: 'UINT_64',
ParquetConvertedType_INT_8: 'INT_8',
ParquetConvertedType_INT_16: 'INT_16',
ParquetConvertedType_INT_32: 'INT_32',
ParquetConvertedType_INT_64: 'INT_64',
ParquetConvertedType_JSON: 'JSON',
ParquetConvertedType_BSON: 'BSON',
ParquetConvertedType_INTERVAL: 'INTERVAL',
}.get(type_, 'UNKNOWN')
cdef encoding_name_from_enum(ParquetEncoding encoding_):
return {
ParquetEncoding_PLAIN: 'PLAIN',
ParquetEncoding_PLAIN_DICTIONARY: 'PLAIN_DICTIONARY',
ParquetEncoding_RLE: 'RLE',
ParquetEncoding_BIT_PACKED: 'BIT_PACKED',
ParquetEncoding_DELTA_BINARY_PACKED: 'DELTA_BINARY_PACKED',
ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY: 'DELTA_LENGTH_BYTE_ARRAY',
ParquetEncoding_DELTA_BYTE_ARRAY: 'DELTA_BYTE_ARRAY',
ParquetEncoding_RLE_DICTIONARY: 'RLE_DICTIONARY',
ParquetEncoding_BYTE_STREAM_SPLIT: 'BYTE_STREAM_SPLIT',
}.get(encoding_, 'UNKNOWN')
cdef encoding_enum_from_name(str encoding_name):
enc = {
'PLAIN': ParquetEncoding_PLAIN,
'BIT_PACKED': ParquetEncoding_BIT_PACKED,
'RLE': ParquetEncoding_RLE,
'BYTE_STREAM_SPLIT': ParquetEncoding_BYTE_STREAM_SPLIT,
'DELTA_BINARY_PACKED': ParquetEncoding_DELTA_BINARY_PACKED,
'DELTA_LENGTH_BYTE_ARRAY': ParquetEncoding_DELTA_LENGTH_BYTE_ARRAY,
'DELTA_BYTE_ARRAY': ParquetEncoding_DELTA_BYTE_ARRAY,
'RLE_DICTIONARY': 'dict',
'PLAIN_DICTIONARY': 'dict',
}.get(encoding_name, None)
if enc is None:
raise ValueError(f"Unsupported column encoding: {encoding_name!r}")
elif enc == 'dict':
raise ValueError(f"{encoding_name!r} is already used by default.")
else:
return enc
cdef compression_name_from_enum(ParquetCompression compression_):
return {
ParquetCompression_UNCOMPRESSED: 'UNCOMPRESSED',
ParquetCompression_SNAPPY: 'SNAPPY',
ParquetCompression_GZIP: 'GZIP',
ParquetCompression_LZO: 'LZO',
ParquetCompression_BROTLI: 'BROTLI',
ParquetCompression_LZ4: 'LZ4',
ParquetCompression_ZSTD: 'ZSTD',
}.get(compression_, 'UNKNOWN')
cdef int check_compression_name(name) except -1:
if name.upper() not in {'NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI', 'LZ4',
'ZSTD'}:
raise ArrowException("Unsupported compression: " + name)
return 0
cdef ParquetCompression compression_from_name(name):
name = name.upper()
if name == 'SNAPPY':
return ParquetCompression_SNAPPY
elif name == 'GZIP':
return ParquetCompression_GZIP
elif name == 'LZO':
return ParquetCompression_LZO
elif name == 'BROTLI':
return ParquetCompression_BROTLI
elif name == 'LZ4':
return ParquetCompression_LZ4
elif name == 'ZSTD':
return ParquetCompression_ZSTD
else:
return ParquetCompression_UNCOMPRESSED
cdef class ParquetReader(_Weakrefable):
cdef:
object source
CMemoryPool* pool
UniquePtrNoGIL[FileReader] reader
FileMetaData _metadata
shared_ptr[CRandomAccessFile] rd_handle
cdef public:
_column_idx_map
def __cinit__(self, MemoryPool memory_pool=None):
self.pool = maybe_unbox_memory_pool(memory_pool)
self._metadata = None
def open(self, object source not None, *, bint use_memory_map=False,
read_dictionary=None, FileMetaData metadata=None,
int buffer_size=0, bint pre_buffer=False,
coerce_int96_timestamp_unit=None,
FileDecryptionProperties decryption_properties=None,
thrift_string_size_limit=None,
thrift_container_size_limit=None,
page_checksum_verification=False):
"""
Open a parquet file for reading.
Parameters
----------
source : str, pathlib.Path, pyarrow.NativeFile, or file-like object
use_memory_map : bool, default False
read_dictionary : iterable[int or str], optional
metadata : FileMetaData, optional
buffer_size : int, default 0
pre_buffer : bool, default False
coerce_int96_timestamp_unit : str, optional
decryption_properties : FileDecryptionProperties, optional
thrift_string_size_limit : int, optional
thrift_container_size_limit : int, optional
page_checksum_verification : bool, default False
"""
cdef:
shared_ptr[CFileMetaData] c_metadata
CReaderProperties properties = default_reader_properties()
ArrowReaderProperties arrow_props = (
default_arrow_reader_properties())
FileReaderBuilder builder
if pre_buffer and not is_threading_enabled():
pre_buffer = False
if metadata is not None:
c_metadata = metadata.sp_metadata
if buffer_size > 0:
properties.enable_buffered_stream()
properties.set_buffer_size(buffer_size)
elif buffer_size == 0:
properties.disable_buffered_stream()
else:
raise ValueError('Buffer size must be larger than zero')
if thrift_string_size_limit is not None:
if thrift_string_size_limit <= 0:
raise ValueError("thrift_string_size_limit "
"must be larger than zero")
properties.set_thrift_string_size_limit(thrift_string_size_limit)
if thrift_container_size_limit is not None:
if thrift_container_size_limit <= 0:
raise ValueError("thrift_container_size_limit "
"must be larger than zero")
properties.set_thrift_container_size_limit(
thrift_container_size_limit)
if decryption_properties is not None:
properties.file_decryption_properties(
decryption_properties.unwrap())
arrow_props.set_pre_buffer(pre_buffer)
properties.set_page_checksum_verification(page_checksum_verification)
if coerce_int96_timestamp_unit is None:
# use the default defined in default_arrow_reader_properties()
pass
else:
arrow_props.set_coerce_int96_timestamp_unit(
string_to_timeunit(coerce_int96_timestamp_unit))
self.source = source
get_reader(source, use_memory_map, &self.rd_handle)
with nogil:
check_status(builder.Open(self.rd_handle, properties, c_metadata))
# Set up metadata
with nogil:
c_metadata = builder.raw_reader().metadata()
self._metadata = result = FileMetaData()
result.init(c_metadata)
if read_dictionary is not None:
self._set_read_dictionary(read_dictionary, &arrow_props)
with nogil:
check_status(builder.memory_pool(self.pool)
.properties(arrow_props)
.Build(&self.reader))
cdef _set_read_dictionary(self, read_dictionary,
ArrowReaderProperties* props):
for column in read_dictionary:
if not isinstance(column, int):
column = self.column_name_idx(column)
props.set_read_dictionary(column, True)
@property
def column_paths(self):
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
vector[c_string] path
int i = 0
paths = []
for i in range(0, metadata.num_columns()):
path = (metadata.schema().Column(i)
.path().get().ToDotVector())
paths.append([frombytes(x) for x in path])
return paths
@property
def metadata(self):
return self._metadata
@property
def schema_arrow(self):
cdef shared_ptr[CSchema] out
with nogil:
check_status(self.reader.get().GetSchema(&out))
return pyarrow_wrap_schema(out)
@property
def num_row_groups(self):
return self.reader.get().num_row_groups()
def set_use_threads(self, bint use_threads):
"""
Parameters
----------
use_threads : bool
"""
if is_threading_enabled():
self.reader.get().set_use_threads(use_threads)
else:
self.reader.get().set_use_threads(False)
def set_batch_size(self, int64_t batch_size):
"""
Parameters
----------
batch_size : int64
"""
self.reader.get().set_batch_size(batch_size)
def iter_batches(self, int64_t batch_size, row_groups, column_indices=None,
bint use_threads=True):
"""
Parameters
----------
batch_size : int64
row_groups : list[int]
column_indices : list[int], optional
use_threads : bool, default True
Yields
------
next : RecordBatch
"""
cdef:
vector[int] c_row_groups
vector[int] c_column_indices
shared_ptr[CRecordBatch] record_batch
UniquePtrNoGIL[CRecordBatchReader] recordbatchreader
self.set_batch_size(batch_size)
if use_threads:
self.set_use_threads(use_threads)
for row_group in row_groups:
c_row_groups.push_back(row_group)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
recordbatchreader = GetResultValue(
self.reader.get().GetRecordBatchReader(
c_row_groups, c_column_indices
)
)
else:
with nogil:
recordbatchreader = GetResultValue(
self.reader.get().GetRecordBatchReader(
c_row_groups
)
)
while True:
with nogil:
check_status(
recordbatchreader.get().ReadNext(&record_batch)
)
if record_batch.get() == NULL:
break
yield pyarrow_wrap_batch(record_batch)
def read_row_group(self, int i, column_indices=None,
bint use_threads=True):
"""
Parameters
----------
i : int
column_indices : list[int], optional
use_threads : bool, default True
Returns
-------
table : pyarrow.Table
"""
return self.read_row_groups([i], column_indices, use_threads)
def read_row_groups(self, row_groups not None, column_indices=None,
bint use_threads=True):
"""
Parameters
----------
row_groups : list[int]
column_indices : list[int], optional
use_threads : bool, default True
Returns
-------
table : pyarrow.Table
"""
cdef:
shared_ptr[CTable] ctable
vector[int] c_row_groups
vector[int] c_column_indices
self.set_use_threads(use_threads)
for row_group in row_groups:
c_row_groups.push_back(row_group)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, c_column_indices,
&ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroups(c_row_groups, &ctable))
return pyarrow_wrap_table(ctable)
def read_all(self, column_indices=None, bint use_threads=True):
"""
Parameters
----------
column_indices : list[int], optional
use_threads : bool, default True
Returns
-------
table : pyarrow.Table
"""
cdef:
shared_ptr[CTable] ctable
vector[int] c_column_indices
self.set_use_threads(use_threads)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
with nogil:
check_status(self.reader.get()
.ReadTable(c_column_indices, &ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))
return pyarrow_wrap_table(ctable)
def scan_contents(self, column_indices=None, batch_size=65536):
"""
Parameters
----------
column_indices : list[int], optional
batch_size : int32, default 65536
Returns
-------
num_rows : int64
"""
cdef:
vector[int] c_column_indices
int32_t c_batch_size
int64_t c_num_rows
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)
c_batch_size = batch_size
with nogil:
check_status(self.reader.get()
.ScanContents(c_column_indices, c_batch_size,
&c_num_rows))
return c_num_rows
def column_name_idx(self, column_name):
"""
Find the index of a column by its name.
Parameters
----------
column_name : str
Name of the column; separation of nesting levels is done via ".".
Returns
-------
column_idx : int
Integer index of the column in the schema.
"""
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container._metadata
int i = 0
if self._column_idx_map is None:
self._column_idx_map = {}
for i in range(0, metadata.num_columns()):
col_bytes = tobytes(metadata.schema().Column(i)
.path().get().ToDotString())
self._column_idx_map[col_bytes] = i
return self._column_idx_map[tobytes(column_name)]
def read_column(self, int column_index):
"""
Read the column at the specified index.
Parameters
----------
column_index : int
Index of the column.
Returns
-------
column : pyarrow.ChunkedArray
"""
cdef shared_ptr[CChunkedArray] out
with nogil:
check_status(self.reader.get()
.ReadColumn(column_index, &out))
return pyarrow_wrap_chunked_array(out)
def close(self):
if not self.closed:
with nogil:
check_status(self.rd_handle.get().Close())
@property
def closed(self):
if self.rd_handle == NULL:
return True
with nogil:
closed = self.rd_handle.get().closed()
return closed
cdef CSortingColumn _convert_sorting_column(SortingColumn sorting_column):
cdef CSortingColumn c_sorting_column
c_sorting_column.column_idx = sorting_column.column_index
c_sorting_column.descending = sorting_column.descending
c_sorting_column.nulls_first = sorting_column.nulls_first
return c_sorting_column
cdef vector[CSortingColumn] _convert_sorting_columns(sorting_columns) except *:
if not (isinstance(sorting_columns, Sequence)
and all(isinstance(col, SortingColumn) for col in sorting_columns)):
raise ValueError(
"'sorting_columns' must be a list of `SortingColumn`")
cdef vector[CSortingColumn] c_sorting_columns = [_convert_sorting_column(col)
for col in sorting_columns]
return c_sorting_columns
cdef shared_ptr[WriterProperties] _create_writer_properties(
use_dictionary=None,
compression=None,
version=None,
write_statistics=None,
data_page_size=None,
compression_level=None,
use_byte_stream_split=False,
column_encoding=None,
data_page_version=None,
FileEncryptionProperties encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False) except *:
"""General writer properties"""
cdef:
shared_ptr[WriterProperties] properties
WriterProperties.Builder props
# data_page_version
if data_page_version is not None:
if data_page_version == "1.0":
props.data_page_version(ParquetDataPageVersion_V1)
elif data_page_version == "2.0":
props.data_page_version(ParquetDataPageVersion_V2)
else:
raise ValueError("Unsupported Parquet data page version: {0}"
.format(data_page_version))
# version
if version is not None:
if version == "1.0":
props.version(ParquetVersion_V1)
elif version in ("2.0", "pseudo-2.0"):
warnings.warn(
"Parquet format '2.0' pseudo version is deprecated, use "
"'2.4' or '2.6' for fine-grained feature selection",
FutureWarning, stacklevel=2)
props.version(ParquetVersion_V2_0)
elif version == "2.4":
props.version(ParquetVersion_V2_4)
elif version == "2.6":
props.version(ParquetVersion_V2_6)
else:
raise ValueError("Unsupported Parquet format version: {0}"
.format(version))
# compression
if isinstance(compression, basestring):
check_compression_name(compression)
props.compression(compression_from_name(compression))
elif compression is not None:
for column, codec in compression.iteritems():
check_compression_name(codec)
props.compression(tobytes(column), compression_from_name(codec))
if isinstance(compression_level, int):
props.compression_level(compression_level)
elif compression_level is not None:
for column, level in compression_level.iteritems():
props.compression_level(tobytes(column), level)
# use_dictionary
if isinstance(use_dictionary, bool):
if use_dictionary:
props.enable_dictionary()
if column_encoding is not None:
raise ValueError(
"To use 'column_encoding' set 'use_dictionary' to False")
else:
props.disable_dictionary()
elif use_dictionary is not None:
# Deactivate dictionary encoding by default
props.disable_dictionary()
for column in use_dictionary:
props.enable_dictionary(tobytes(column))
if (column_encoding is not None and
column_encoding.get(column) is not None):
raise ValueError(
"To use 'column_encoding' set 'use_dictionary' to False")
# write_statistics
if isinstance(write_statistics, bool):
if write_statistics:
props.enable_statistics()
else:
props.disable_statistics()
elif write_statistics is not None:
# Deactivate statistics by default and enable for specified columns
props.disable_statistics()
for column in write_statistics:
props.enable_statistics(tobytes(column))
# sorting_columns
if sorting_columns is not None:
props.set_sorting_columns(_convert_sorting_columns(sorting_columns))
# use_byte_stream_split
if isinstance(use_byte_stream_split, bool):
if use_byte_stream_split:
if column_encoding is not None:
raise ValueError(
"'use_byte_stream_split' cannot be passed"
"together with 'column_encoding'")
else:
props.encoding(ParquetEncoding_BYTE_STREAM_SPLIT)
elif use_byte_stream_split is not None:
for column in use_byte_stream_split:
if column_encoding is None:
column_encoding = {column: 'BYTE_STREAM_SPLIT'}
elif column_encoding.get(column, None) is None:
column_encoding[column] = 'BYTE_STREAM_SPLIT'
else:
raise ValueError(
"'use_byte_stream_split' cannot be passed"
"together with 'column_encoding'")
# store_decimal_as_integer
if isinstance(store_decimal_as_integer, bool):
if store_decimal_as_integer:
props.enable_store_decimal_as_integer()
else:
props.disable_store_decimal_as_integer()
else:
raise TypeError("'store_decimal_as_integer' must be a boolean")
# column_encoding
# encoding map - encode individual columns
if column_encoding is not None:
if isinstance(column_encoding, dict):
for column, _encoding in column_encoding.items():
props.encoding(tobytes(column),
encoding_enum_from_name(_encoding))
elif isinstance(column_encoding, str):
props.encoding(encoding_enum_from_name(column_encoding))
else:
raise TypeError(
"'column_encoding' should be a dictionary or a string")
if data_page_size is not None:
props.data_pagesize(data_page_size)
if write_batch_size is not None:
props.write_batch_size(write_batch_size)
if dictionary_pagesize_limit is not None:
props.dictionary_pagesize_limit(dictionary_pagesize_limit)
# encryption
if encryption_properties is not None:
props.encryption(
(<FileEncryptionProperties>encryption_properties).unwrap())
# For backwards compatibility reasons we cap the maximum row group size
# at 64Mi rows. This could be changed in the future, though it would be
# a breaking change.
#
# The user can always specify a smaller row group size (and the default
# is smaller) when calling write_table. If the call to write_table uses
# a size larger than this then it will be latched to this value.
props.max_row_group_length(_MAX_ROW_GROUP_SIZE)
# checksum
if write_page_checksum:
props.enable_page_checksum()
else:
props.disable_page_checksum()
# page index
if write_page_index:
props.enable_write_page_index()
else:
props.disable_write_page_index()
properties = props.build()
return properties
cdef shared_ptr[ArrowWriterProperties] _create_arrow_writer_properties(
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
allow_truncated_timestamps=False,
writer_engine_version=None,
use_compliant_nested_type=True,
store_schema=True) except *:
"""Arrow writer properties"""
cdef:
shared_ptr[ArrowWriterProperties] arrow_properties
ArrowWriterProperties.Builder arrow_props
# Store the original Arrow schema so things like dictionary types can
# be automatically reconstructed
if store_schema:
arrow_props.store_schema()
# int96 support
if use_deprecated_int96_timestamps:
arrow_props.enable_deprecated_int96_timestamps()
else:
arrow_props.disable_deprecated_int96_timestamps()
# coerce_timestamps
if coerce_timestamps == 'ms':
arrow_props.coerce_timestamps(TimeUnit_MILLI)
elif coerce_timestamps == 'us':
arrow_props.coerce_timestamps(TimeUnit_MICRO)
elif coerce_timestamps is not None:
raise ValueError('Invalid value for coerce_timestamps: {0}'
.format(coerce_timestamps))
# allow_truncated_timestamps
if allow_truncated_timestamps:
arrow_props.allow_truncated_timestamps()
else:
arrow_props.disallow_truncated_timestamps()
# use_compliant_nested_type
if use_compliant_nested_type:
arrow_props.enable_compliant_nested_types()
else:
arrow_props.disable_compliant_nested_types()
# writer_engine_version
if writer_engine_version == "V1":
warnings.warn("V1 parquet writer engine is a no-op. Use V2.")
arrow_props.set_engine_version(ArrowWriterEngineVersion.V1)
elif writer_engine_version != "V2":
raise ValueError("Unsupported Writer Engine Version: {0}"
.format(writer_engine_version))
arrow_properties = arrow_props.build()
return arrow_properties
cdef _name_to_index_map(Schema arrow_schema):
cdef:
shared_ptr[CSchema] sp_arrow_schema
shared_ptr[SchemaDescriptor] sp_parquet_schema
shared_ptr[WriterProperties] props = _create_writer_properties()
shared_ptr[ArrowWriterProperties] arrow_props = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
allow_truncated_timestamps=False,
writer_engine_version="V2"
)
sp_arrow_schema = pyarrow_unwrap_schema(arrow_schema)
with nogil:
check_status(ToParquetSchema(
sp_arrow_schema.get(), deref(props.get()), deref(arrow_props.get()), &sp_parquet_schema))
out = dict()
cdef SchemaDescriptor* parquet_schema = sp_parquet_schema.get()
for i in range(parquet_schema.num_columns()):
name = frombytes(parquet_schema.Column(i).path().get().ToDotString())
out[name] = i
return out
cdef class ParquetWriter(_Weakrefable):
cdef:
unique_ptr[FileWriter] writer
shared_ptr[COutputStream] sink
bint own_sink
cdef readonly:
object use_dictionary
object use_deprecated_int96_timestamps
object use_byte_stream_split
object column_encoding
object coerce_timestamps
object allow_truncated_timestamps
object compression
object compression_level
object data_page_version
object use_compliant_nested_type
object version
object write_statistics
object writer_engine_version
int row_group_size
int64_t data_page_size
FileEncryptionProperties encryption_properties
int64_t write_batch_size
int64_t dictionary_pagesize_limit
object store_schema
object store_decimal_as_integer
def __cinit__(self, where, Schema schema not None, use_dictionary=None,
compression=None, version=None,
write_statistics=None,
MemoryPool memory_pool=None,
use_deprecated_int96_timestamps=False,
coerce_timestamps=None,
data_page_size=None,
allow_truncated_timestamps=False,
compression_level=None,
use_byte_stream_split=False,
column_encoding=None,
writer_engine_version=None,
data_page_version=None,
use_compliant_nested_type=True,
encryption_properties=None,
write_batch_size=None,
dictionary_pagesize_limit=None,
store_schema=True,
write_page_index=False,
write_page_checksum=False,
sorting_columns=None,
store_decimal_as_integer=False):
cdef:
shared_ptr[WriterProperties] properties
shared_ptr[ArrowWriterProperties] arrow_properties
c_string c_where
CMemoryPool* pool
try:
where = _stringify_path(where)
except TypeError:
get_writer(where, &self.sink)
self.own_sink = False
else:
c_where = tobytes(where)
with nogil:
self.sink = GetResultValue(FileOutputStream.Open(c_where))
self.own_sink = True
properties = _create_writer_properties(
use_dictionary=use_dictionary,
compression=compression,
version=version,
write_statistics=write_statistics,
data_page_size=data_page_size,
compression_level=compression_level,
use_byte_stream_split=use_byte_stream_split,
column_encoding=column_encoding,
data_page_version=data_page_version,
encryption_properties=encryption_properties,
write_batch_size=write_batch_size,
dictionary_pagesize_limit=dictionary_pagesize_limit,
write_page_index=write_page_index,
write_page_checksum=write_page_checksum,
sorting_columns=sorting_columns,
store_decimal_as_integer=store_decimal_as_integer,
)
arrow_properties = _create_arrow_writer_properties(
use_deprecated_int96_timestamps=use_deprecated_int96_timestamps,
coerce_timestamps=coerce_timestamps,
allow_truncated_timestamps=allow_truncated_timestamps,
writer_engine_version=writer_engine_version,
use_compliant_nested_type=use_compliant_nested_type,
store_schema=store_schema,
)
pool = maybe_unbox_memory_pool(memory_pool)
with nogil:
self.writer = move(GetResultValue(
FileWriter.Open(deref(schema.schema), pool,
self.sink, properties, arrow_properties)))
def close(self):
with nogil:
check_status(self.writer.get().Close())
if self.own_sink:
check_status(self.sink.get().Close())
def write_table(self, Table table, row_group_size=None):
cdef:
CTable* ctable = table.table
int64_t c_row_group_size
if row_group_size is None or row_group_size == -1:
c_row_group_size = min(ctable.num_rows(), _DEFAULT_ROW_GROUP_SIZE)
elif row_group_size == 0:
raise ValueError('Row group size cannot be 0')
else:
c_row_group_size = row_group_size
with nogil:
check_status(self.writer.get()
.WriteTable(deref(ctable), c_row_group_size))
def add_key_value_metadata(self, key_value_metadata):
cdef:
shared_ptr[const CKeyValueMetadata] c_metadata
c_metadata = pyarrow_unwrap_metadata(KeyValueMetadata(key_value_metadata))
with nogil:
check_status(self.writer.get()
.AddKeyValueMetadata(c_metadata))
@property
def metadata(self):
cdef:
shared_ptr[CFileMetaData] metadata
FileMetaData result
with nogil:
metadata = self.writer.get().metadata()
if metadata:
result = FileMetaData()
result.init(metadata)
return result
raise RuntimeError(
'file metadata is only available after writer close')