// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <array>
#include <chrono>
#include <cstdint>
#include <cstring>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "parquet/column_reader.h"
#include "parquet/file_reader.h"
#include "parquet/stream_writer.h"
namespace parquet {
/// \brief A class for reading Parquet files using an output stream type API.
///
/// The values given must be of the correct type i.e. the type must
/// match the file schema exactly otherwise a ParquetException will be
/// thrown.
///
/// The user must explicitly advance to the next row using the
/// EndRow() function or EndRow input manipulator.
///
/// Required and optional fields are supported:
/// - Required fields are read using operator>>(T)
/// - Optional fields are read with
/// operator>>(std::optional<T>)
///
/// Note that operator>>(std::optional<T>) can be used to read
/// required fields.
///
/// Similarly operator>>(T) can be used to read optional fields.
/// However, if the value is not present then a ParquetException will
/// be raised.
///
/// Currently there is no support for repeated fields.
///
class PARQUET_EXPORT StreamReader {
public:
template <typename T>
using optional = ::std::optional<T>;
// N.B. Default constructed objects are not usable. This
// constructor is provided so that the object may be move
// assigned afterwards.
StreamReader() = default;
explicit StreamReader(std::unique_ptr<ParquetFileReader> reader);
~StreamReader() = default;
bool eof() const { return eof_; }
int current_column() const { return column_index_; }
int64_t current_row() const { return current_row_; }
int num_columns() const;
int64_t num_rows() const;
// Moving is possible.
StreamReader(StreamReader&&) = default;
StreamReader& operator=(StreamReader&&) = default;
// Copying is not allowed.
StreamReader(const StreamReader&) = delete;
StreamReader& operator=(const StreamReader&) = delete;
StreamReader& operator>>(bool& v);
StreamReader& operator>>(int8_t& v);
StreamReader& operator>>(uint8_t& v);
StreamReader& operator>>(int16_t& v);
StreamReader& operator>>(uint16_t& v);
StreamReader& operator>>(int32_t& v);
StreamReader& operator>>(uint32_t& v);
StreamReader& operator>>(int64_t& v);
StreamReader& operator>>(uint64_t& v);
StreamReader& operator>>(std::chrono::milliseconds& v);
StreamReader& operator>>(std::chrono::microseconds& v);
StreamReader& operator>>(float& v);
StreamReader& operator>>(double& v);
StreamReader& operator>>(char& v);
template <int N>
StreamReader& operator>>(char (&v)[N]) {
ReadFixedLength(v, N);
return *this;
}
template <std::size_t N>
StreamReader& operator>>(std::array<char, N>& v) {
ReadFixedLength(v.data(), static_cast<int>(N));
return *this;
}
// N.B. Cannot allow for reading to a arbitrary char pointer as the
// length cannot be verified. Also it would overshadow the
// char[N] input operator.
// StreamReader& operator>>(char * v);
StreamReader& operator>>(std::string& v);
StreamReader& operator>>(::arrow::Decimal128& v);
// Input operators for optional fields.
StreamReader& operator>>(optional<bool>& v);
StreamReader& operator>>(optional<int8_t>& v);
StreamReader& operator>>(optional<uint8_t>& v);
StreamReader& operator>>(optional<int16_t>& v);
StreamReader& operator>>(optional<uint16_t>& v);
StreamReader& operator>>(optional<int32_t>& v);
StreamReader& operator>>(optional<uint32_t>& v);
StreamReader& operator>>(optional<int64_t>& v);
StreamReader& operator>>(optional<uint64_t>& v);
StreamReader& operator>>(optional<float>& v);
StreamReader& operator>>(optional<double>& v);
StreamReader& operator>>(optional<std::chrono::milliseconds>& v);
StreamReader& operator>>(optional<std::chrono::microseconds>& v);
StreamReader& operator>>(optional<char>& v);
StreamReader& operator>>(optional<std::string>& v);
StreamReader& operator>>(optional<::arrow::Decimal128>& v);
template <std::size_t N>
StreamReader& operator>>(optional<std::array<char, N>>& v) {
CheckColumn(Type::FIXED_LEN_BYTE_ARRAY, ConvertedType::NONE, N);
FixedLenByteArray flba;
if (ReadOptional(&flba)) {
v = std::array<char, N>{};
std::memcpy(v->data(), flba.ptr, N);
} else {
v.reset();
}
return *this;
}
/// \brief Terminate current row and advance to next one.
/// \throws ParquetException if all columns in the row were not
/// read or skipped.
void EndRow();
/// \brief Skip the data in the next columns.
/// If the number of columns exceeds the columns remaining on the
/// current row then skipping is terminated - it does _not_ continue
/// skipping columns on the next row.
/// Skipping of columns still requires the use 'EndRow' even if all
/// remaining columns were skipped.
/// \return Number of columns actually skipped.
int64_t SkipColumns(int64_t num_columns_to_skip);
/// \brief Skip the data in the next rows.
/// Skipping of rows is not allowed if reading of data for the
/// current row is not finished.
/// Skipping of rows will be terminated if the end of file is
/// reached.
/// \return Number of rows actually skipped.
int64_t SkipRows(int64_t num_rows_to_skip);
protected:
[[noreturn]] void ThrowReadFailedException(
const std::shared_ptr<schema::PrimitiveNode>& node);
template <typename ReaderType, typename T>
void Read(T* v) {
const auto& node = nodes_[column_index_];
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
int16_t def_level;
int16_t rep_level;
int64_t values_read;
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, v, &values_read);
if (values_read != 1) {
ThrowReadFailedException(node);
}
}
template <typename ReaderType, typename ReadType, typename T>
void Read(T* v) {
const auto& node = nodes_[column_index_];
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
int16_t def_level;
int16_t rep_level;
ReadType tmp;
int64_t values_read;
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
if (values_read == 1) {
*v = tmp;
} else {
ThrowReadFailedException(node);
}
}
template <typename ReaderType, typename ReadType = typename ReaderType::T, typename T>
void ReadOptional(optional<T>* v) {
const auto& node = nodes_[column_index_];
auto reader = static_cast<ReaderType*>(column_readers_[column_index_++].get());
int16_t def_level;
int16_t rep_level;
ReadType tmp;
int64_t values_read;
reader->ReadBatch(kBatchSizeOne, &def_level, &rep_level, &tmp, &values_read);
if (values_read == 1) {
*v = T(tmp);
} else if ((values_read == 0) && (def_level == 0)) {
v->reset();
} else {
ThrowReadFailedException(node);
}
}
void ReadFixedLength(char* ptr, int len);
void Read(ByteArray* v);
void Read(FixedLenByteArray* v);
bool ReadOptional(ByteArray* v);
bool ReadOptional(FixedLenByteArray* v);
void NextRowGroup();
void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
int length = 0);
void SkipRowsInColumn(ColumnReader* reader, int64_t num_rows_to_skip);
void SetEof();
private:
std::unique_ptr<ParquetFileReader> file_reader_;
std::shared_ptr<FileMetaData> file_metadata_;
std::shared_ptr<RowGroupReader> row_group_reader_;
std::vector<std::shared_ptr<ColumnReader>> column_readers_;
std::vector<std::shared_ptr<schema::PrimitiveNode>> nodes_;
bool eof_{true};
int row_group_index_{0};
int column_index_{0};
int64_t current_row_{0};
int64_t row_group_row_offset_{0};
static constexpr int64_t kBatchSizeOne = 1;
}; // namespace parquet
PARQUET_EXPORT
StreamReader& operator>>(StreamReader&, EndRowType);
} // namespace parquet