// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This module defines an abstract interface for iterating through pages in a
// Parquet column chunk within a row group. It could be extended in the future
// to iterate through all data pages in all chunks in a file.
#pragma once
#include <algorithm>
#include <limits>
#include <memory>
#include <random>
#include <string>
#include <utility>
#include <vector>
#include <gtest/gtest.h>
#include "arrow/io/memory.h"
#include "arrow/testing/util.h"
#include "arrow/util/float16.h"
#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/encoding.h"
#include "parquet/platform.h"
// https://github.com/google/googletest/pull/2904 might not be available
// in our version of gtest/gmock
#define EXPECT_THROW_THAT(callable, ex_type, property) \
EXPECT_THROW( \
try { (callable)(); } catch (const ex_type& err) { \
EXPECT_THAT(err, (property)); \
throw; \
}, \
ex_type)
namespace parquet {
static constexpr int FLBA_LENGTH = 12;
inline bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
}
namespace test {
typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType,
DoubleType, ByteArrayType, FLBAType>
ParquetTypes;
class ParquetTestException : public parquet::ParquetException {
using ParquetException::ParquetException;
};
const char* get_data_dir();
std::string get_bad_data_dir();
std::string get_data_file(const std::string& filename, bool is_good = true);
template <typename T>
static inline void assert_vector_equal(const std::vector<T>& left,
const std::vector<T>& right) {
ASSERT_EQ(left.size(), right.size());
for (size_t i = 0; i < left.size(); ++i) {
ASSERT_EQ(left[i], right[i]) << i;
}
}
template <typename T>
static inline bool vector_equal(const std::vector<T>& left, const std::vector<T>& right) {
if (left.size() != right.size()) {
return false;
}
for (size_t i = 0; i < left.size(); ++i) {
if (left[i] != right[i]) {
std::cerr << "index " << i << " left was " << left[i] << " right was " << right[i]
<< std::endl;
return false;
}
}
return true;
}
template <typename T>
static std::vector<T> slice(const std::vector<T>& values, int start, int end) {
if (end < start) {
return std::vector<T>(0);
}
std::vector<T> out(end - start);
for (int i = start; i < end; ++i) {
out[i - start] = values[i];
}
return out;
}
void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out);
void random_bools(int n, double p, uint32_t seed, bool* out);
template <typename T>
inline void random_numbers(int n, uint32_t seed, T min_value, T max_value, T* out) {
std::default_random_engine gen(seed);
std::uniform_int_distribution<T> d(min_value, max_value);
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}
template <>
inline void random_numbers(int n, uint32_t seed, float min_value, float max_value,
float* out) {
std::default_random_engine gen(seed);
std::uniform_real_distribution<float> d(min_value, max_value);
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}
template <>
inline void random_numbers(int n, uint32_t seed, double min_value, double max_value,
double* out) {
std::default_random_engine gen(seed);
std::uniform_real_distribution<double> d(min_value, max_value);
for (int i = 0; i < n; ++i) {
out[i] = d(gen);
}
}
void random_Int96_numbers(int n, uint32_t seed, int32_t min_value, int32_t max_value,
Int96* out);
void random_float16_numbers(int n, uint32_t seed, ::arrow::util::Float16 min_value,
::arrow::util::Float16 max_value, uint16_t* out);
void random_fixed_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out);
void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int min_size,
int max_size);
void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int max_size);
void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out,
int min_size, int max_size, double prefixed_probability);
void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out,
double prefixed_probability);
template <typename Type, typename Sequence>
std::shared_ptr<Buffer> EncodeValues(Encoding::type encoding, bool use_dictionary,
const Sequence& values, int length,
const ColumnDescriptor* descr) {
auto encoder = MakeTypedEncoder<Type>(encoding, use_dictionary, descr);
encoder->Put(values, length);
return encoder->FlushValues();
}
template <typename T>
static void InitValues(int num_values, uint32_t seed, std::vector<T>& values,
std::vector<uint8_t>& buffer) {
random_numbers(num_values, seed, std::numeric_limits<T>::min(),
std::numeric_limits<T>::max(), values.data());
}
template <typename T>
static void InitValues(int num_values, std::vector<T>& values,
std::vector<uint8_t>& buffer) {
InitValues(num_values, 0, values, buffer);
}
template <typename T>
static void InitDictValues(int num_values, int num_dicts, std::vector<T>& values,
std::vector<uint8_t>& buffer) {
int repeat_factor = num_values / num_dicts;
InitValues<T>(num_dicts, values, buffer);
// add some repeated values
for (int j = 1; j < repeat_factor; ++j) {
for (int i = 0; i < num_dicts; ++i) {
std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
}
}
// computed only dict_per_page * repeat_factor - 1 values < num_values
// compute remaining
for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
}
}
template <>
inline void InitDictValues<bool>(int num_values, int num_dicts, std::vector<bool>& values,
std::vector<uint8_t>& buffer) {
// No op for bool
}
class MockPageReader : public PageReader {
public:
explicit MockPageReader(const std::vector<std::shared_ptr<Page>>& pages)
: pages_(pages), page_index_(0) {}
std::shared_ptr<Page> NextPage() override {
if (page_index_ == static_cast<int>(pages_.size())) {
// EOS to consumer
return std::shared_ptr<Page>(nullptr);
}
return pages_[page_index_++];
}
// No-op
void set_max_page_header_size(uint32_t size) override {}
private:
std::vector<std::shared_ptr<Page>> pages_;
int page_index_;
};
// TODO(wesm): this is only used for testing for now. Refactor to form part of
// primary file write path
template <typename Type>
class DataPageBuilder {
public:
using c_type = typename Type::c_type;
// This class writes data and metadata to the passed inputs
explicit DataPageBuilder(ArrowOutputStream* sink)
: sink_(sink),
num_values_(0),
encoding_(Encoding::PLAIN),
definition_level_encoding_(Encoding::RLE),
repetition_level_encoding_(Encoding::RLE),
have_def_levels_(false),
have_rep_levels_(false),
have_values_(false) {}
void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level,
Encoding::type encoding = Encoding::RLE) {
AppendLevels(levels, max_level, encoding);
num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
definition_level_encoding_ = encoding;
have_def_levels_ = true;
}
void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level,
Encoding::type encoding = Encoding::RLE) {
AppendLevels(levels, max_level, encoding);
num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
repetition_level_encoding_ = encoding;
have_rep_levels_ = true;
}
void AppendValues(const ColumnDescriptor* d, const std::vector<c_type>& values,
Encoding::type encoding = Encoding::PLAIN) {
std::shared_ptr<Buffer> values_sink = EncodeValues<Type>(
encoding, false, values.data(), static_cast<int>(values.size()), d);
PARQUET_THROW_NOT_OK(sink_->Write(values_sink->data(), values_sink->size()));
num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
encoding_ = encoding;
have_values_ = true;
}
int32_t num_values() const { return num_values_; }
Encoding::type encoding() const { return encoding_; }
Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
Encoding::type def_level_encoding() const { return definition_level_encoding_; }
private:
ArrowOutputStream* sink_;
int32_t num_values_;
Encoding::type encoding_;
Encoding::type definition_level_encoding_;
Encoding::type repetition_level_encoding_;
bool have_def_levels_;
bool have_rep_levels_;
bool have_values_;
// Used internally for both repetition and definition levels
void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
Encoding::type encoding) {
if (encoding != Encoding::RLE) {
ParquetException::NYI("only rle encoding currently implemented");
}
std::vector<uint8_t> encode_buffer(LevelEncoder::MaxBufferSize(
Encoding::RLE, max_level, static_cast<int>(levels.size())));
// We encode into separate memory from the output stream because the
// RLE-encoded bytes have to be preceded in the stream by their absolute
// size.
LevelEncoder encoder;
encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
encode_buffer.data(), static_cast<int>(encode_buffer.size()));
encoder.Encode(static_cast<int>(levels.size()), levels.data());
int32_t rle_bytes = encoder.len();
PARQUET_THROW_NOT_OK(
sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t)));
PARQUET_THROW_NOT_OK(sink_->Write(encode_buffer.data(), rle_bytes));
}
};
template <>
inline void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d,
const std::vector<bool>& values,
Encoding::type encoding) {
if (encoding != Encoding::PLAIN) {
ParquetException::NYI("only plain encoding currently implemented");
}
auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN, false, d);
dynamic_cast<BooleanEncoder*>(encoder.get())
->Put(values, static_cast<int>(values.size()));
std::shared_ptr<Buffer> buffer = encoder->FlushValues();
PARQUET_THROW_NOT_OK(sink_->Write(buffer->data(), buffer->size()));
num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
encoding_ = encoding;
have_values_ = true;
}
Loading ...