Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

/ _orc.pyx

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

# cython: profile=False
# distutils: language = c++

from cython.operator cimport dereference as deref
from libcpp.vector cimport vector as std_vector
from libcpp.utility cimport move
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.lib cimport (check_status, _Weakrefable,
                          MemoryPool, maybe_unbox_memory_pool,
                          pyarrow_wrap_schema,
                          pyarrow_wrap_batch,
                          Table,
                          pyarrow_wrap_table,
                          pyarrow_wrap_metadata,
                          pyarrow_unwrap_table,
                          get_reader,
                          get_writer)
from pyarrow.lib import frombytes, tobytes
from pyarrow.util import _stringify_path


cdef compression_type_from_enum(CCompressionType compression_type):
    compression_map = {
        CCompressionType_UNCOMPRESSED: 'UNCOMPRESSED',
        CCompressionType_GZIP: 'ZLIB',
        CCompressionType_SNAPPY: 'SNAPPY',
        CCompressionType_LZ4: 'LZ4',
        CCompressionType_ZSTD: 'ZSTD',
    }
    if compression_type in compression_map:
        return compression_map[compression_type]
    raise ValueError('Unsupported compression')


cdef CCompressionType compression_type_from_name(name) except *:
    if not isinstance(name, str):
        raise TypeError('compression must be a string')
    name = name.upper()
    if name == 'ZLIB':
        return CCompressionType_GZIP
    elif name == 'SNAPPY':
        return CCompressionType_SNAPPY
    elif name == 'LZ4':
        return CCompressionType_LZ4
    elif name == 'ZSTD':
        return CCompressionType_ZSTD
    elif name == 'UNCOMPRESSED':
        return CCompressionType_UNCOMPRESSED
    raise ValueError(f'Unknown CompressionKind: {name}')


cdef compression_strategy_from_enum(
    CompressionStrategy compression_strategy
):
    compression_strategy_map = {
        _CompressionStrategy_SPEED: 'SPEED',
        _CompressionStrategy_COMPRESSION: 'COMPRESSION',
    }
    if compression_strategy in compression_strategy_map:
        return compression_strategy_map[compression_strategy]
    raise ValueError('Unsupported compression strategy')


cdef CompressionStrategy compression_strategy_from_name(name) except *:
    if not isinstance(name, str):
        raise TypeError('compression strategy must be a string')
    name = name.upper()
    if name == 'COMPRESSION':
        return _CompressionStrategy_COMPRESSION
    elif name == 'SPEED':
        return _CompressionStrategy_SPEED
    raise ValueError(f'Unknown CompressionStrategy: {name}')


cdef file_version_from_class(FileVersion file_version):
    return frombytes(file_version.ToString())


cdef writer_id_from_enum(WriterId writer_id):
    writer_id_map = {
        _WriterId_ORC_JAVA_WRITER: 'ORC_JAVA',
        _WriterId_ORC_CPP_WRITER: 'ORC_CPP',
        _WriterId_PRESTO_WRITER: 'PRESTO',
        _WriterId_SCRITCHLEY_GO: 'SCRITCHLEY_GO',
        _WriterId_TRINO_WRITER: 'TRINO',
    }
    if writer_id in writer_id_map:
        return writer_id_map[writer_id]
    raise ValueError('Unsupported writer ID')


cdef writer_version_from_enum(WriterVersion writer_version):
    writer_version_map = {
        _WriterVersion_ORIGINAL: 'ORIGINAL',
        _WriterVersion_HIVE_8732: 'HIVE_8732',
        _WriterVersion_HIVE_4243: 'HIVE_4243',
        _WriterVersion_HIVE_12055: 'HIVE_12055',
        _WriterVersion_HIVE_13083: 'HIVE_13083',
        _WriterVersion_ORC_101: 'ORC_101',
        _WriterVersion_ORC_135: 'ORC_135',
        _WriterVersion_ORC_517: 'ORC_517',
        _WriterVersion_ORC_203: 'ORC_203',
        _WriterVersion_ORC_14: 'ORC_14',
    }
    if writer_version in writer_version_map:
        return writer_version_map[writer_version]
    raise ValueError('Unsupported writer version')


cdef shared_ptr[WriteOptions] _create_write_options(
    file_version=None,
    batch_size=None,
    stripe_size=None,
    compression=None,
    compression_block_size=None,
    compression_strategy=None,
    row_index_stride=None,
    padding_tolerance=None,
    dictionary_key_size_threshold=None,
    bloom_filter_columns=None,
    bloom_filter_fpp=None
) except *:
    """General writer options"""
    cdef:
        shared_ptr[WriteOptions] options
    options = make_shared[WriteOptions]()
    # batch_size
    if batch_size is not None:
        if isinstance(batch_size, int) and batch_size > 0:
            deref(options).batch_size = batch_size
        else:
            raise ValueError(f"Invalid ORC writer batch size: {batch_size}")
    # file_version
    if file_version is not None:
        if file_version == "0.12":
            deref(options).file_version = FileVersion(0, 12)
        elif file_version == "0.11":
            deref(options).file_version = FileVersion(0, 11)
        else:
            raise ValueError(f"Unsupported ORC file version: {file_version}")
    # stripe_size
    if stripe_size is not None:
        if isinstance(stripe_size, int) and stripe_size > 0:
            deref(options).stripe_size = stripe_size
        else:
            raise ValueError(f"Invalid ORC stripe size: {stripe_size}")
    # compression
    if compression is not None:
        if isinstance(compression, str):
            deref(options).compression = compression_type_from_name(
                compression)
        else:
            raise TypeError("Unsupported ORC compression type: "
                            f"{compression}")
    # compression_block_size
    if compression_block_size is not None:
        if (isinstance(compression_block_size, int) and
                compression_block_size > 0):
            deref(options).compression_block_size = compression_block_size
        else:
            raise ValueError("Invalid ORC compression block size: "
                             f"{compression_block_size}")
    # compression_strategy
    if compression_strategy is not None:
        if isinstance(compression, str):
            deref(options).compression_strategy = \
                compression_strategy_from_name(compression_strategy)
        else:
            raise TypeError("Unsupported ORC compression strategy: "
                            f"{compression_strategy}")
    # row_index_stride
    if row_index_stride is not None:
        if isinstance(row_index_stride, int) and row_index_stride > 0:
            deref(options).row_index_stride = row_index_stride
        else:
            raise ValueError("Invalid ORC row index stride: "
                             f"{row_index_stride}")
    # padding_tolerance
    if padding_tolerance is not None:
        try:
            padding_tolerance = float(padding_tolerance)
            deref(options).padding_tolerance = padding_tolerance
        except Exception:
            raise ValueError("Invalid ORC padding tolerance: "
                             f"{padding_tolerance}")
    # dictionary_key_size_threshold
    if dictionary_key_size_threshold is not None:
        try:
            dictionary_key_size_threshold = float(
                dictionary_key_size_threshold)
            assert 0 <= dictionary_key_size_threshold <= 1
            deref(options).dictionary_key_size_threshold = \
                dictionary_key_size_threshold
        except Exception:
            raise ValueError("Invalid ORC dictionary key size threshold: "
                             f"{dictionary_key_size_threshold}")
    # bloom_filter_columns
    if bloom_filter_columns is not None:
        try:
            bloom_filter_columns = list(bloom_filter_columns)
            for col in bloom_filter_columns:
                assert isinstance(col, int) and col >= 0
            deref(options).bloom_filter_columns = bloom_filter_columns
        except Exception:
            raise ValueError("Invalid ORC BloomFilter columns: "
                             f"{bloom_filter_columns}")
    # Max false positive rate of the Bloom Filter
    if bloom_filter_fpp is not None:
        try:
            bloom_filter_fpp = float(bloom_filter_fpp)
            assert 0 <= bloom_filter_fpp <= 1
            deref(options).bloom_filter_fpp = bloom_filter_fpp
        except Exception:
            raise ValueError("Invalid ORC BloomFilter false positive rate: "
                             f"{bloom_filter_fpp}")
    return options


cdef class ORCReader(_Weakrefable):
    cdef:
        object source
        CMemoryPool* allocator
        unique_ptr[ORCFileReader] reader

    def __cinit__(self, MemoryPool memory_pool=None):
        self.allocator = maybe_unbox_memory_pool(memory_pool)

    def open(self, object source, c_bool use_memory_map=True):
        cdef:
            shared_ptr[CRandomAccessFile] rd_handle

        self.source = source

        get_reader(source, use_memory_map, &rd_handle)
        with nogil:
            self.reader = move(GetResultValue(
                ORCFileReader.Open(rd_handle, self.allocator)
            ))

    def metadata(self):
        """
        The arrow metadata for this file.

        Returns
        -------
        metadata : pyarrow.KeyValueMetadata
        """
        cdef:
            shared_ptr[const CKeyValueMetadata] sp_arrow_metadata

        with nogil:
            sp_arrow_metadata = GetResultValue(
                deref(self.reader).ReadMetadata()
            )

        return pyarrow_wrap_metadata(sp_arrow_metadata)

    def schema(self):
        """
        The arrow schema for this file.

        Returns
        -------
        schema : pyarrow.Schema
        """
        cdef:
            shared_ptr[CSchema] sp_arrow_schema

        with nogil:
            sp_arrow_schema = GetResultValue(deref(self.reader).ReadSchema())

        return pyarrow_wrap_schema(sp_arrow_schema)

    def nrows(self):
        return deref(self.reader).NumberOfRows()

    def nstripes(self):
        return deref(self.reader).NumberOfStripes()

    def file_version(self):
        return file_version_from_class(deref(self.reader).GetFileVersion())

    def software_version(self):
        return frombytes(deref(self.reader).GetSoftwareVersion())

    def compression(self):
        return compression_type_from_enum(
            GetResultValue(deref(self.reader).GetCompression()))

    def compression_size(self):
        return deref(self.reader).GetCompressionSize()

    def row_index_stride(self):
        return deref(self.reader).GetRowIndexStride()

    def writer(self):
        writer_name = writer_id_from_enum(deref(self.reader).GetWriterId())
        if writer_name == 'UNKNOWN':
            return deref(self.reader).GetWriterIdValue()
        else:
            return writer_name

    def writer_version(self):
        return writer_version_from_enum(deref(self.reader).GetWriterVersion())

    def nstripe_statistics(self):
        return deref(self.reader).GetNumberOfStripeStatistics()

    def content_length(self):
        return deref(self.reader).GetContentLength()

    def stripe_statistics_length(self):
        return deref(self.reader).GetStripeStatisticsLength()

    def file_footer_length(self):
        return deref(self.reader).GetFileFooterLength()

    def file_postscript_length(self):
        return deref(self.reader).GetFilePostscriptLength()

    def file_length(self):
        return deref(self.reader).GetFileLength()

    def serialized_file_tail(self):
        return deref(self.reader).GetSerializedFileTail()

    def read_stripe(self, n, columns=None):
Loading ...