Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev251 

/ include / parquet / arrow / reader.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cstdint>
// N.B. we don't include async_generator.h as it's relatively heavy
#include <functional>
#include <memory>
#include <vector>

#include "parquet/file_reader.h"
#include "parquet/platform.h"
#include "parquet/properties.h"

namespace arrow {

class ChunkedArray;
class KeyValueMetadata;
class RecordBatchReader;
struct Scalar;
class Schema;
class Table;
class RecordBatch;

}  // namespace arrow

namespace parquet {

class FileMetaData;
class SchemaDescriptor;

namespace arrow {

class ColumnChunkReader;
class ColumnReader;
struct SchemaManifest;
class RowGroupReader;

/// \brief Arrow read adapter class for deserializing Parquet files as Arrow row batches.
///
/// This interfaces caters for different use cases and thus provides different
/// interfaces. In its most simplistic form, we cater for a user that wants to
/// read the whole Parquet at once with the `FileReader::ReadTable` method.
///
/// More advanced users that also want to implement parallelism on top of each
/// single Parquet files should do this on the RowGroup level. For this, they can
/// call `FileReader::RowGroup(i)->ReadTable` to receive only the specified
/// RowGroup as a table.
///
/// In the most advanced situation, where a consumer wants to independently read
/// RowGroups in parallel and consume each column individually, they can call
/// `FileReader::RowGroup(i)->Column(j)->Read` and receive an `arrow::Column`
/// instance.
///
/// Finally, one can also get a stream of record batches using
/// `FileReader::GetRecordBatchReader()`. This can internally decode columns
/// in parallel if use_threads was enabled in the ArrowReaderProperties.
///
/// The parquet format supports an optional integer field_id which can be assigned
/// to a field.  Arrow will convert these field IDs to a metadata key named
/// PARQUET:field_id on the appropriate field.
// TODO(wesm): nested data does not always make sense with this user
// interface unless you are only reading a single leaf node from a branch of
// a table. For example:
//
// repeated group data {
//   optional group record {
//     optional int32 val1;
//     optional byte_array val2;
//     optional bool val3;
//   }
//   optional int32 val4;
// }
//
// In the Parquet file, there are 4 leaf nodes:
//
// * data.record.val1
// * data.record.val2
// * data.record.val3
// * data.val4
//
// When materializing this data in an Arrow array, we would have:
//
// data: list<struct<
//   record: struct<
//    val1: int32,
//    val2: string (= list<uint8>),
//    val3: bool,
//   >,
//   val4: int32
// >>
//
// However, in the Parquet format, each leaf node has its own repetition and
// definition levels describing the structure of the intermediate nodes in
// this array structure. Thus, we will need to scan the leaf data for a group
// of leaf nodes part of the same type tree to create a single result Arrow
// nested array structure.
//
// This is additionally complicated "chunky" repeated fields or very large byte
// arrays
class PARQUET_EXPORT FileReader {
 public:
  /// Factory function to create a FileReader from a ParquetFileReader and properties
  static ::arrow::Status Make(::arrow::MemoryPool* pool,
                              std::unique_ptr<ParquetFileReader> reader,
                              const ArrowReaderProperties& properties,
                              std::unique_ptr<FileReader>* out);

  /// Factory function to create a FileReader from a ParquetFileReader
  static ::arrow::Status Make(::arrow::MemoryPool* pool,
                              std::unique_ptr<ParquetFileReader> reader,
                              std::unique_ptr<FileReader>* out);

  // Since the distribution of columns amongst a Parquet file's row groups may
  // be uneven (the number of values in each column chunk can be different), we
  // provide a column-oriented read interface. The ColumnReader hides the
  // details of paging through the file's row groups and yielding
  // fully-materialized arrow::Array instances
  //
  // Returns error status if the column of interest is not flat.
  // The indicated column index is relative to the schema
  virtual ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out) = 0;

  /// \brief Return arrow schema for all the columns.
  virtual ::arrow::Status GetSchema(std::shared_ptr<::arrow::Schema>* out) = 0;

  /// \brief Read column as a whole into a chunked array.
  ///
  /// The index i refers the index of the top level schema field, which may
  /// be nested or flat - e.g.
  ///
  /// 0 foo.bar
  ///   foo.bar.baz
  ///   foo.qux
  /// 1 foo2
  /// 2 foo3
  ///
  /// i=0 will read the entire foo struct, i=1 the foo2 primitive column etc
  virtual ::arrow::Status ReadColumn(int i,
                                     std::shared_ptr<::arrow::ChunkedArray>* out) = 0;

  /// \brief Return a RecordBatchReader of all row groups and columns.
  ///
  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
  ::arrow::Status GetRecordBatchReader(std::unique_ptr<::arrow::RecordBatchReader>* out);

  /// \brief Return a RecordBatchReader of all row groups and columns.
  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
  GetRecordBatchReader() = 0;

  /// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
  ///
  /// Note that the ordering in row_group_indices matters. FileReaders must outlive
  /// their RecordBatchReaders.
  ///
  /// \returns error Status if row_group_indices contains an invalid index
  ///
  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
  virtual ::arrow::Status GetRecordBatchReader(
      const std::vector<int>& row_group_indices,
      std::unique_ptr<::arrow::RecordBatchReader>* out);

  /// \brief Return a RecordBatchReader of row groups selected from row_group_indices.
  ///
  /// Note that the ordering in row_group_indices matters. FileReaders must outlive
  /// their RecordBatchReaders.
  ///
  /// \returns error Result if row_group_indices contains an invalid index
  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
  GetRecordBatchReader(const std::vector<int>& row_group_indices) = 0;

  /// \brief Return a RecordBatchReader of row groups selected from
  /// row_group_indices, whose columns are selected by column_indices.
  ///
  /// Note that the ordering in row_group_indices and column_indices
  /// matter. FileReaders must outlive their RecordBatchReaders.
  ///
  /// \returns error Status if either row_group_indices or column_indices
  ///     contains an invalid index
  ///
  /// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
  ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
  virtual ::arrow::Status GetRecordBatchReader(
      const std::vector<int>& row_group_indices, const std::vector<int>& column_indices,
      std::unique_ptr<::arrow::RecordBatchReader>* out);

  /// \brief Return a RecordBatchReader of row groups selected from
  /// row_group_indices, whose columns are selected by column_indices.
  ///
  /// Note that the ordering in row_group_indices and column_indices
  /// matter. FileReaders must outlive their RecordBatchReaders.
  ///
  /// \returns error Result if either row_group_indices or column_indices
  ///     contains an invalid index
  virtual ::arrow::Result<std::unique_ptr<::arrow::RecordBatchReader>>
  GetRecordBatchReader(const std::vector<int>& row_group_indices,
                       const std::vector<int>& column_indices) = 0;

  /// \brief Return a RecordBatchReader of row groups selected from
  /// row_group_indices, whose columns are selected by column_indices.
  ///
  /// Note that the ordering in row_group_indices and column_indices
  /// matter. FileReaders must outlive their RecordBatchReaders.
  ///
  /// \param row_group_indices which row groups to read (order determines read order).
  /// \param column_indices which columns to read (order determines output schema).
  /// \param[out] out record batch stream from parquet data.
  ///
  /// \returns error Status if either row_group_indices or column_indices
  ///     contains an invalid index
  ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                       const std::vector<int>& column_indices,
                                       std::shared_ptr<::arrow::RecordBatchReader>* out);
  ::arrow::Status GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                       std::shared_ptr<::arrow::RecordBatchReader>* out);
  ::arrow::Status GetRecordBatchReader(std::shared_ptr<::arrow::RecordBatchReader>* out);

  /// \brief Return a generator of record batches.
  ///
  /// The FileReader must outlive the generator, so this requires that you pass in a
  /// shared_ptr.
  ///
  /// \returns error Result if either row_group_indices or column_indices contains an
  ///     invalid index
  virtual ::arrow::Result<
      std::function<::arrow::Future<std::shared_ptr<::arrow::RecordBatch>>()>>
  GetRecordBatchGenerator(std::shared_ptr<FileReader> reader,
                          const std::vector<int> row_group_indices,
                          const std::vector<int> column_indices,
                          ::arrow::internal::Executor* cpu_executor = NULLPTR,
                          int64_t rows_to_readahead = 0) = 0;

  /// Read all columns into a Table
  virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0;

  /// \brief Read the given columns into a Table
  ///
  /// The indicated column indices are relative to the internal representation
  /// of the parquet table. For instance :
  /// 0 foo.bar
  ///       foo.bar.baz           0
  ///       foo.bar.baz2          1
  ///   foo.qux                   2
  /// 1 foo2                      3
  /// 2 foo3                      4
  ///
  /// i=0 will read foo.bar.baz, i=1 will read only foo.bar.baz2 and so on.
  /// Only leaf fields have indices; foo itself doesn't have an index.
  /// To get the index for a particular leaf field, one can use
  /// manifest().schema_fields to get the top level fields, and then walk the
  /// tree to identify the relevant leaf fields and access its column_index.
  /// To get the total number of leaf fields, use FileMetadata.num_columns().
  virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices,
                                    std::shared_ptr<::arrow::Table>* out) = 0;

  virtual ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
                                       std::shared_ptr<::arrow::Table>* out) = 0;

  virtual ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out) = 0;

  virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
                                        const std::vector<int>& column_indices,
                                        std::shared_ptr<::arrow::Table>* out) = 0;

  virtual ::arrow::Status ReadRowGroups(const std::vector<int>& row_groups,
                                        std::shared_ptr<::arrow::Table>* out) = 0;

  /// \brief Scan file contents with one thread, return number of rows
  virtual ::arrow::Status ScanContents(std::vector<int> columns,
                                       const int32_t column_batch_size,
                                       int64_t* num_rows) = 0;

  /// \brief Return a reader for the RowGroup, this object must not outlive the
  ///   FileReader.
  virtual std::shared_ptr<RowGroupReader> RowGroup(int row_group_index) = 0;

  /// \brief The number of row groups in the file
  virtual int num_row_groups() const = 0;

  virtual ParquetFileReader* parquet_reader() const = 0;

  /// Set whether to use multiple threads during reads of multiple columns.
  /// By default only one thread is used.
  virtual void set_use_threads(bool use_threads) = 0;

  /// Set number of records to read per batch for the RecordBatchReader.
  virtual void set_batch_size(int64_t batch_size) = 0;

  virtual const ArrowReaderProperties& properties() const = 0;

  virtual const SchemaManifest& manifest() const = 0;

  virtual ~FileReader() = default;
};

class RowGroupReader {
 public:
  virtual ~RowGroupReader() = default;
  virtual std::shared_ptr<ColumnChunkReader> Column(int column_index) = 0;
  virtual ::arrow::Status ReadTable(const std::vector<int>& column_indices,
                                    std::shared_ptr<::arrow::Table>* out) = 0;
  virtual ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out) = 0;

 private:
  struct Iterator;
};

class ColumnChunkReader {
 public:
  virtual ~ColumnChunkReader() = default;
  virtual ::arrow::Status Read(std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
};

// At this point, the column reader is a stream iterator. It only knows how to
// read the next batch of values for a particular column from the file until it
// runs out.
//
// We also do not expose any internal Parquet details, such as row groups. This
// might change in the future.
class PARQUET_EXPORT ColumnReader {
 public:
  virtual ~ColumnReader() = default;

  // Scan the next array of the indicated size. The actual size of the
  // returned array may be less than the passed size depending how much data is
  // available in the file.
  //
  // When all the data in the file has been exhausted, the result is set to
  // nullptr.
  //
  // Returns Status::OK on a successful read, including if you have exhausted
  // the data available in the file.
  virtual ::arrow::Status NextBatch(int64_t batch_size,
                                    std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
};

/// \brief Experimental helper class for bindings (like Python) that struggle
/// either with std::move or C++ exceptions
class PARQUET_EXPORT FileReaderBuilder {
 public:
  FileReaderBuilder();

  /// Create FileReaderBuilder from Arrow file and optional properties / metadata
  ::arrow::Status Open(std::shared_ptr<::arrow::io::RandomAccessFile> file,
                       const ReaderProperties& properties = default_reader_properties(),
                       std::shared_ptr<FileMetaData> metadata = NULLPTR);

  /// Create FileReaderBuilder from file path and optional properties / metadata
  ::arrow::Status OpenFile(const std::string& path, bool memory_map = false,
                           const ReaderProperties& props = default_reader_properties(),
                           std::shared_ptr<FileMetaData> metadata = NULLPTR);

  ParquetFileReader* raw_reader() { return raw_reader_.get(); }

  /// Set Arrow MemoryPool for memory allocation
  FileReaderBuilder* memory_pool(::arrow::MemoryPool* pool);
  /// Set Arrow reader properties
  FileReaderBuilder* properties(const ArrowReaderProperties& arg_properties);
  /// Build FileReader instance
  ::arrow::Status Build(std::unique_ptr<FileReader>* out);
  ::arrow::Result<std::unique_ptr<FileReader>> Build();

 private:
  ::arrow::MemoryPool* pool_;
  ArrowReaderProperties properties_;
  std::unique_ptr<ParquetFileReader> raw_reader_;
};

/// \defgroup parquet-arrow-reader-factories Factory functions for Parquet Arrow readers
///
/// @{

/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
///
/// \deprecated Deprecated in 19.0.0. Use arrow::Result version instead.
ARROW_DEPRECATED("Deprecated in 19.0.0. Use arrow::Result version instead.")
PARQUET_EXPORT
::arrow::Status OpenFile(std::shared_ptr<::arrow::io::RandomAccessFile>,
                         ::arrow::MemoryPool* allocator,
                         std::unique_ptr<FileReader>* reader);

/// \brief Build FileReader from Arrow file and MemoryPool
///
/// Advanced settings are supported through the FileReaderBuilder class.
PARQUET_EXPORT
::arrow::Result<std::unique_ptr<FileReader>> OpenFile(
    std::shared_ptr<::arrow::io::RandomAccessFile>, ::arrow::MemoryPool* allocator);

/// @}

PARQUET_EXPORT
::arrow::Status StatisticsAsScalars(const Statistics& Statistics,
                                    std::shared_ptr<::arrow::Scalar>* min,
                                    std::shared_ptr<::arrow::Scalar>* max);

namespace internal {

PARQUET_EXPORT
::arrow::Status FuzzReader(const uint8_t* data, int64_t size);

}  // namespace internal
}  // namespace arrow
}  // namespace parquet