# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# cython: profile=False
# distutils: language = c++
from collections.abc import Sequence
from textwrap import indent
import warnings
from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
check_status,
MemoryPool, maybe_unbox_memory_pool,
Table, KeyValueMetadata,
pyarrow_wrap_chunked_array,
pyarrow_wrap_schema,
pyarrow_unwrap_metadata,
pyarrow_unwrap_schema,
pyarrow_wrap_table,
pyarrow_wrap_batch,
pyarrow_wrap_scalar,
NativeFile, get_reader, get_writer,
string_to_timeunit)
from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
_stringify_path,
tobytes, frombytes, is_threading_enabled)
cimport cpython as cp
_DEFAULT_ROW_GROUP_SIZE = 1024*1024
_MAX_ROW_GROUP_SIZE = 64*1024*1024
cdef class Statistics(_Weakrefable):
"""Statistics for a single column in a single row group."""
def __cinit__(self):
pass
def __repr__(self):
return """{}
has_min_max: {}
min: {}
max: {}
null_count: {}
distinct_count: {}
num_values: {}
physical_type: {}
logical_type: {}
converted_type (legacy): {}""".format(object.__repr__(self),
self.has_min_max,
self.min,
self.max,
self.null_count,
self.distinct_count,
self.num_values,
self.physical_type,
str(self.logical_type),
self.converted_type)
def to_dict(self):
"""
Get dictionary representation of statistics.
Returns
-------
dict
Dictionary with a key for each attribute of this class.
"""
d = dict(
has_min_max=self.has_min_max,
min=self.min,
max=self.max,
null_count=self.null_count,
distinct_count=self.distinct_count,
num_values=self.num_values,
physical_type=self.physical_type
)
return d
def __eq__(self, other):
try:
return self.equals(other)
except TypeError:
return NotImplemented
def equals(self, Statistics other):
"""
Return whether the two column statistics objects are equal.
Parameters
----------
other : Statistics
Statistics to compare against.
Returns
-------
are_equal : bool
"""
return self.statistics.get().Equals(deref(other.statistics.get()))
@property
def has_min_max(self):
"""Whether min and max are present (bool)."""
return self.statistics.get().HasMinMax()
@property
def has_null_count(self):
"""Whether null count is present (bool)."""
return self.statistics.get().HasNullCount()
@property
def has_distinct_count(self):
"""Whether distinct count is preset (bool)."""
return self.statistics.get().HasDistinctCount()
@property
def min_raw(self):
"""Min value as physical type (bool, int, float, or bytes)."""
if self.has_min_max:
return _cast_statistic_raw_min(self.statistics.get())
else:
return None
@property
def max_raw(self):
"""Max value as physical type (bool, int, float, or bytes)."""
if self.has_min_max:
return _cast_statistic_raw_max(self.statistics.get())
else:
return None
@property
def min(self):
"""
Min value as logical type.
Returned as the Python equivalent of logical type, such as datetime.date
for dates and decimal.Decimal for decimals.
"""
if self.has_min_max:
min_scalar, _ = _cast_statistics(self.statistics.get())
return min_scalar.as_py()
else:
return None
@property
def max(self):
"""
Max value as logical type.
Returned as the Python equivalent of logical type, such as datetime.date
for dates and decimal.Decimal for decimals.
"""
if self.has_min_max:
_, max_scalar = _cast_statistics(self.statistics.get())
return max_scalar.as_py()
else:
return None
@property
def null_count(self):
"""Number of null values in chunk (int)."""
if self.has_null_count:
return self.statistics.get().null_count()
else:
return None
@property
def distinct_count(self):
"""Distinct number of values in chunk (int)."""
if self.has_distinct_count:
return self.statistics.get().distinct_count()
else:
return None
@property
def num_values(self):
"""Number of non-null values (int)."""
return self.statistics.get().num_values()
@property
def physical_type(self):
"""Physical type of column (str)."""
raw_physical_type = self.statistics.get().physical_type()
return physical_type_name_from_enum(raw_physical_type)
@property
def logical_type(self):
"""Logical type of column (:class:`ParquetLogicalType`)."""
return wrap_logical_type(self.statistics.get().descr().logical_type())
@property
def converted_type(self):
"""Legacy converted type (str or None)."""
raw_converted_type = self.statistics.get().descr().converted_type()
return converted_type_name_from_enum(raw_converted_type)
cdef class ParquetLogicalType(_Weakrefable):
"""Logical type of parquet type."""
cdef:
shared_ptr[const CParquetLogicalType] type
def __cinit__(self):
pass
cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
self.type = type
def __repr__(self):
return "{}\n {}".format(object.__repr__(self), str(self))
def __str__(self):
return frombytes(self.type.get().ToString(), safe=True)
def to_json(self):
"""
Get a JSON string containing type and type parameters.
Returns
-------
json : str
JSON representation of type, with at least a field called 'Type'
which contains the type name. If the type is parameterized, such
as a decimal with scale and precision, will contain those as fields
as well.
"""
return frombytes(self.type.get().ToJSON())
@property
def type(self):
"""Name of the logical type (str)."""
return logical_type_name_from_enum(self.type.get().type())
cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
cdef ParquetLogicalType out = ParquetLogicalType()
out.init(type)
return out
cdef _cast_statistic_raw_min(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).min()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).min()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).min()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).min()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).min()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).min())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)
cdef _cast_statistic_raw_max(CStatistics* statistics):
cdef ParquetType physical_type = statistics.physical_type()
cdef uint32_t type_length = statistics.descr().type_length()
if physical_type == ParquetType_BOOLEAN:
return (<CBoolStatistics*> statistics).max()
elif physical_type == ParquetType_INT32:
return (<CInt32Statistics*> statistics).max()
elif physical_type == ParquetType_INT64:
return (<CInt64Statistics*> statistics).max()
elif physical_type == ParquetType_FLOAT:
return (<CFloatStatistics*> statistics).max()
elif physical_type == ParquetType_DOUBLE:
return (<CDoubleStatistics*> statistics).max()
elif physical_type == ParquetType_BYTE_ARRAY:
return _box_byte_array((<CByteArrayStatistics*> statistics).max())
elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)
cdef _cast_statistics(CStatistics* statistics):
cdef:
shared_ptr[CScalar] c_min
shared_ptr[CScalar] c_max
check_status(StatisticsAsScalars(statistics[0], &c_min, &c_max))
return (pyarrow_wrap_scalar(c_min), pyarrow_wrap_scalar(c_max))
cdef _box_byte_array(ParquetByteArray val):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)
cdef _box_flba(ParquetFLBA val, uint32_t len):
return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)
cdef class ColumnChunkMetaData(_Weakrefable):
"""Column metadata for a single row group."""
def __cinit__(self):
pass
def __repr__(self):
statistics = indent(repr(self.statistics), 4 * ' ')
return """{0}
file_offset: {1}
file_path: {2}
physical_type: {3}
num_values: {4}
path_in_schema: {5}
is_stats_set: {6}
statistics:
{7}
compression: {8}
encodings: {9}
has_dictionary_page: {10}
dictionary_page_offset: {11}
data_page_offset: {12}
total_compressed_size: {13}
total_uncompressed_size: {14}""".format(object.__repr__(self),
self.file_offset,
self.file_path,
self.physical_type,
self.num_values,
self.path_in_schema,
self.is_stats_set,
statistics,
Loading ...