Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ _parquet.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++

from collections.abc import Sequence
from textwrap import indent
import warnings

from cython.operator cimport dereference as deref
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_python cimport *
from pyarrow.lib cimport (_Weakrefable, Buffer, Schema,
                          check_status,
                          MemoryPool, maybe_unbox_memory_pool,
                          Table, NativeFile,
                          pyarrow_wrap_chunked_array,
                          pyarrow_wrap_schema,
                          pyarrow_unwrap_schema,
                          pyarrow_wrap_table,
                          pyarrow_wrap_batch,
                          pyarrow_wrap_scalar,
                          NativeFile, get_reader, get_writer,
                          string_to_timeunit)

from pyarrow.lib import (ArrowException, NativeFile, BufferOutputStream,
                         _stringify_path,
                         tobytes, frombytes)

cimport cpython as cp

_DEFAULT_ROW_GROUP_SIZE = 1024*1024
_MAX_ROW_GROUP_SIZE = 64*1024*1024

cdef class Statistics(_Weakrefable):
    """Statistics for a single column in a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        return """{}
  has_min_max: {}
  min: {}
  max: {}
  null_count: {}
  distinct_count: {}
  num_values: {}
  physical_type: {}
  logical_type: {}
  converted_type (legacy): {}""".format(object.__repr__(self),
                                        self.has_min_max,
                                        self.min,
                                        self.max,
                                        self.null_count,
                                        self.distinct_count,
                                        self.num_values,
                                        self.physical_type,
                                        str(self.logical_type),
                                        self.converted_type)

    def to_dict(self):
        """
        Get dictionary representation of statistics.

        Returns
        -------
        dict
            Dictionary with a key for each attribute of this class.
        """
        d = dict(
            has_min_max=self.has_min_max,
            min=self.min,
            max=self.max,
            null_count=self.null_count,
            distinct_count=self.distinct_count,
            num_values=self.num_values,
            physical_type=self.physical_type
        )
        return d

    def __eq__(self, other):
        try:
            return self.equals(other)
        except TypeError:
            return NotImplemented

    def equals(self, Statistics other):
        """
        Return whether the two column statistics objects are equal.

        Parameters
        ----------
        other : Statistics
            Statistics to compare against.

        Returns
        -------
        are_equal : bool
        """
        return self.statistics.get().Equals(deref(other.statistics.get()))

    @property
    def has_min_max(self):
        """Whether min and max are present (bool)."""
        return self.statistics.get().HasMinMax()

    @property
    def has_null_count(self):
        """Whether null count is present (bool)."""
        return self.statistics.get().HasNullCount()

    @property
    def has_distinct_count(self):
        """Whether distinct count is preset (bool)."""
        return self.statistics.get().HasDistinctCount()

    @property
    def min_raw(self):
        """Min value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_min(self.statistics.get())
        else:
            return None

    @property
    def max_raw(self):
        """Max value as physical type (bool, int, float, or bytes)."""
        if self.has_min_max:
            return _cast_statistic_raw_max(self.statistics.get())
        else:
            return None

    @property
    def min(self):
        """
        Min value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            min_scalar, _ = _cast_statistics(self.statistics.get())
            return min_scalar.as_py()
        else:
            return None

    @property
    def max(self):
        """
        Max value as logical type.

        Returned as the Python equivalent of logical type, such as datetime.date
        for dates and decimal.Decimal for decimals.
        """
        if self.has_min_max:
            _, max_scalar = _cast_statistics(self.statistics.get())
            return max_scalar.as_py()
        else:
            return None

    @property
    def null_count(self):
        """Number of null values in chunk (int)."""
        if self.has_null_count:
            return self.statistics.get().null_count()
        else:
            return None

    @property
    def distinct_count(self):
        """Distinct number of values in chunk (int)."""
        if self.has_distinct_count:
            return self.statistics.get().distinct_count()
        else:
            return None

    @property
    def num_values(self):
        """Number of non-null values (int)."""
        return self.statistics.get().num_values()

    @property
    def physical_type(self):
        """Physical type of column (str)."""
        raw_physical_type = self.statistics.get().physical_type()
        return physical_type_name_from_enum(raw_physical_type)

    @property
    def logical_type(self):
        """Logical type of column (:class:`ParquetLogicalType`)."""
        return wrap_logical_type(self.statistics.get().descr().logical_type())

    @property
    def converted_type(self):
        """Legacy converted type (str or None)."""
        raw_converted_type = self.statistics.get().descr().converted_type()
        return converted_type_name_from_enum(raw_converted_type)


cdef class ParquetLogicalType(_Weakrefable):
    """Logical type of parquet type."""
    cdef:
        shared_ptr[const CParquetLogicalType] type

    def __cinit__(self):
        pass

    cdef init(self, const shared_ptr[const CParquetLogicalType]& type):
        self.type = type

    def __repr__(self):
        return "{}\n  {}".format(object.__repr__(self), str(self))

    def __str__(self):
        return frombytes(self.type.get().ToString(), safe=True)

    def to_json(self):
        """
        Get a JSON string containing type and type parameters.

        Returns
        -------
        json : str
            JSON representation of type, with at least a field called 'Type'
            which contains the type name. If the type is parameterized, such
            as a decimal with scale and precision, will contain those as fields
            as well.
        """
        return frombytes(self.type.get().ToJSON())

    @property
    def type(self):
        """Name of the logical type (str)."""
        return logical_type_name_from_enum(self.type.get().type())


cdef wrap_logical_type(const shared_ptr[const CParquetLogicalType]& type):
    cdef ParquetLogicalType out = ParquetLogicalType()
    out.init(type)
    return out


cdef _cast_statistic_raw_min(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).min()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).min()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).min()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).min()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).min()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).min())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).min(), type_length)


cdef _cast_statistic_raw_max(CStatistics* statistics):
    cdef ParquetType physical_type = statistics.physical_type()
    cdef uint32_t type_length = statistics.descr().type_length()
    if physical_type == ParquetType_BOOLEAN:
        return (<CBoolStatistics*> statistics).max()
    elif physical_type == ParquetType_INT32:
        return (<CInt32Statistics*> statistics).max()
    elif physical_type == ParquetType_INT64:
        return (<CInt64Statistics*> statistics).max()
    elif physical_type == ParquetType_FLOAT:
        return (<CFloatStatistics*> statistics).max()
    elif physical_type == ParquetType_DOUBLE:
        return (<CDoubleStatistics*> statistics).max()
    elif physical_type == ParquetType_BYTE_ARRAY:
        return _box_byte_array((<CByteArrayStatistics*> statistics).max())
    elif physical_type == ParquetType_FIXED_LEN_BYTE_ARRAY:
        return _box_flba((<CFLBAStatistics*> statistics).max(), type_length)


cdef _cast_statistics(CStatistics* statistics):
    cdef:
        shared_ptr[CScalar] c_min
        shared_ptr[CScalar] c_max
    check_status(StatisticsAsScalars(statistics[0], &c_min, &c_max))
    return (pyarrow_wrap_scalar(c_min), pyarrow_wrap_scalar(c_max))


cdef _box_byte_array(ParquetByteArray val):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> val.len)


cdef _box_flba(ParquetFLBA val, uint32_t len):
    return cp.PyBytes_FromStringAndSize(<char*> val.ptr, <Py_ssize_t> len)


cdef class ColumnChunkMetaData(_Weakrefable):
    """Column metadata for a single row group."""

    def __cinit__(self):
        pass

    def __repr__(self):
        statistics = indent(repr(self.statistics), 4 * ' ')
        return """{0}
  file_offset: {1}
  file_path: {2}
  physical_type: {3}
  num_values: {4}
  path_in_schema: {5}
  is_stats_set: {6}
  statistics:
{7}
  compression: {8}
  encodings: {9}
  has_dictionary_page: {10}
  dictionary_page_offset: {11}
  data_page_offset: {12}
  total_compressed_size: {13}
  total_uncompressed_size: {14}""".format(object.__repr__(self),
                                          self.file_offset,
                                          self.file_path,
                                          self.physical_type,
                                          self.num_values,
                                          self.path_in_schema,
                                          self.is_stats_set,
                                          statistics,
                                          self.compression,
Loading ...