Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ include / parquet / test_util.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

// This module defines an abstract interface for iterating through pages in a
// Parquet column chunk within a row group. It could be extended in the future
// to iterate through all data pages in all chunks in a file.

#pragma once

#include <algorithm>
#include <limits>
#include <memory>
#include <random>
#include <string>
#include <utility>
#include <vector>

#include <gtest/gtest.h>

#include "arrow/io/memory.h"
#include "arrow/testing/util.h"
#include "arrow/util/float16.h"

#include "parquet/column_page.h"
#include "parquet/column_reader.h"
#include "parquet/column_writer.h"
#include "parquet/encoding.h"
#include "parquet/platform.h"

// https://github.com/google/googletest/pull/2904 might not be available
// in our version of gtest/gmock
#define EXPECT_THROW_THAT(callable, ex_type, property)   \
  EXPECT_THROW(                                          \
      try { (callable)(); } catch (const ex_type& err) { \
        EXPECT_THAT(err, (property));                    \
        throw;                                           \
      },                                                 \
      ex_type)

namespace parquet {

static constexpr int FLBA_LENGTH = 12;

inline bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
  return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
}

namespace test {

typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType,
                         DoubleType, ByteArrayType, FLBAType>
    ParquetTypes;

class ParquetTestException : public parquet::ParquetException {
  using ParquetException::ParquetException;
};

const char* get_data_dir();
std::string get_bad_data_dir();

std::string get_data_file(const std::string& filename, bool is_good = true);

template <typename T>
static inline void assert_vector_equal(const std::vector<T>& left,
                                       const std::vector<T>& right) {
  ASSERT_EQ(left.size(), right.size());

  for (size_t i = 0; i < left.size(); ++i) {
    ASSERT_EQ(left[i], right[i]) << i;
  }
}

template <typename T>
static inline bool vector_equal(const std::vector<T>& left, const std::vector<T>& right) {
  if (left.size() != right.size()) {
    return false;
  }

  for (size_t i = 0; i < left.size(); ++i) {
    if (left[i] != right[i]) {
      std::cerr << "index " << i << " left was " << left[i] << " right was " << right[i]
                << std::endl;
      return false;
    }
  }

  return true;
}

template <typename T>
static std::vector<T> slice(const std::vector<T>& values, int start, int end) {
  if (end < start) {
    return std::vector<T>(0);
  }

  std::vector<T> out(end - start);
  for (int i = start; i < end; ++i) {
    out[i - start] = values[i];
  }
  return out;
}

void random_bytes(int n, uint32_t seed, std::vector<uint8_t>* out);
void random_bools(int n, double p, uint32_t seed, bool* out);

template <typename T>
inline void random_numbers(int n, uint32_t seed, T min_value, T max_value, T* out) {
  std::default_random_engine gen(seed);
  std::uniform_int_distribution<T> d(min_value, max_value);
  for (int i = 0; i < n; ++i) {
    out[i] = d(gen);
  }
}

template <>
inline void random_numbers(int n, uint32_t seed, float min_value, float max_value,
                           float* out) {
  std::default_random_engine gen(seed);
  std::uniform_real_distribution<float> d(min_value, max_value);
  for (int i = 0; i < n; ++i) {
    out[i] = d(gen);
  }
}

template <>
inline void random_numbers(int n, uint32_t seed, double min_value, double max_value,
                           double* out) {
  std::default_random_engine gen(seed);
  std::uniform_real_distribution<double> d(min_value, max_value);
  for (int i = 0; i < n; ++i) {
    out[i] = d(gen);
  }
}

void random_Int96_numbers(int n, uint32_t seed, int32_t min_value, int32_t max_value,
                          Int96* out);

void random_float16_numbers(int n, uint32_t seed, ::arrow::util::Float16 min_value,
                            ::arrow::util::Float16 max_value, uint16_t* out);

void random_fixed_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out);

void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int min_size,
                       int max_size);

void random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out, int max_size);

void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, ByteArray* out,
                                int min_size, int max_size, double prefixed_probability);

void prefixed_random_byte_array(int n, uint32_t seed, uint8_t* buf, int len, FLBA* out,
                                double prefixed_probability);

template <typename Type, typename Sequence>
std::shared_ptr<Buffer> EncodeValues(Encoding::type encoding, bool use_dictionary,
                                     const Sequence& values, int length,
                                     const ColumnDescriptor* descr) {
  auto encoder = MakeTypedEncoder<Type>(encoding, use_dictionary, descr);
  encoder->Put(values, length);
  return encoder->FlushValues();
}

template <typename T>
static void InitValues(int num_values, uint32_t seed, std::vector<T>& values,
                       std::vector<uint8_t>& buffer) {
  random_numbers(num_values, seed, std::numeric_limits<T>::min(),
                 std::numeric_limits<T>::max(), values.data());
}

template <typename T>
static void InitValues(int num_values, std::vector<T>& values,
                       std::vector<uint8_t>& buffer) {
  InitValues(num_values, 0, values, buffer);
}

template <typename T>
static void InitDictValues(int num_values, int num_dicts, std::vector<T>& values,
                           std::vector<uint8_t>& buffer) {
  int repeat_factor = num_values / num_dicts;
  InitValues<T>(num_dicts, values, buffer);
  // add some repeated values
  for (int j = 1; j < repeat_factor; ++j) {
    for (int i = 0; i < num_dicts; ++i) {
      std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
    }
  }
  // computed only dict_per_page * repeat_factor - 1 values < num_values
  // compute remaining
  for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
    std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
  }
}

template <>
inline void InitDictValues<bool>(int num_values, int num_dicts, std::vector<bool>& values,
                                 std::vector<uint8_t>& buffer) {
  // No op for bool
}

class MockPageReader : public PageReader {
 public:
  explicit MockPageReader(const std::vector<std::shared_ptr<Page>>& pages)
      : pages_(pages), page_index_(0) {}

  std::shared_ptr<Page> NextPage() override {
    if (page_index_ == static_cast<int>(pages_.size())) {
      // EOS to consumer
      return std::shared_ptr<Page>(nullptr);
    }
    return pages_[page_index_++];
  }

  // No-op
  void set_max_page_header_size(uint32_t size) override {}

 private:
  std::vector<std::shared_ptr<Page>> pages_;
  int page_index_;
};

// TODO(wesm): this is only used for testing for now. Refactor to form part of
// primary file write path
template <typename Type>
class DataPageBuilder {
 public:
  using c_type = typename Type::c_type;

  // This class writes data and metadata to the passed inputs
  explicit DataPageBuilder(ArrowOutputStream* sink)
      : sink_(sink),
        num_values_(0),
        encoding_(Encoding::PLAIN),
        definition_level_encoding_(Encoding::RLE),
        repetition_level_encoding_(Encoding::RLE),
        have_def_levels_(false),
        have_rep_levels_(false),
        have_values_(false) {}

  void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level,
                       Encoding::type encoding = Encoding::RLE) {
    AppendLevels(levels, max_level, encoding);

    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
    definition_level_encoding_ = encoding;
    have_def_levels_ = true;
  }

  void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level,
                       Encoding::type encoding = Encoding::RLE) {
    AppendLevels(levels, max_level, encoding);

    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
    repetition_level_encoding_ = encoding;
    have_rep_levels_ = true;
  }

  void AppendValues(const ColumnDescriptor* d, const std::vector<c_type>& values,
                    Encoding::type encoding = Encoding::PLAIN) {
    std::shared_ptr<Buffer> values_sink = EncodeValues<Type>(
        encoding, false, values.data(), static_cast<int>(values.size()), d);
    PARQUET_THROW_NOT_OK(sink_->Write(values_sink->data(), values_sink->size()));

    num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
    encoding_ = encoding;
    have_values_ = true;
  }

  int32_t num_values() const { return num_values_; }

  Encoding::type encoding() const { return encoding_; }

  Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }

  Encoding::type def_level_encoding() const { return definition_level_encoding_; }

 private:
  ArrowOutputStream* sink_;

  int32_t num_values_;
  Encoding::type encoding_;
  Encoding::type definition_level_encoding_;
  Encoding::type repetition_level_encoding_;

  bool have_def_levels_;
  bool have_rep_levels_;
  bool have_values_;

  // Used internally for both repetition and definition levels
  void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
                    Encoding::type encoding) {
    if (encoding != Encoding::RLE) {
      ParquetException::NYI("only rle encoding currently implemented");
    }

    std::vector<uint8_t> encode_buffer(LevelEncoder::MaxBufferSize(
        Encoding::RLE, max_level, static_cast<int>(levels.size())));

    // We encode into separate memory from the output stream because the
    // RLE-encoded bytes have to be preceded in the stream by their absolute
    // size.
    LevelEncoder encoder;
    encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
                 encode_buffer.data(), static_cast<int>(encode_buffer.size()));

    encoder.Encode(static_cast<int>(levels.size()), levels.data());

    int32_t rle_bytes = encoder.len();
    PARQUET_THROW_NOT_OK(
        sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t)));
    PARQUET_THROW_NOT_OK(sink_->Write(encode_buffer.data(), rle_bytes));
  }
};

template <>
inline void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor* d,
                                                       const std::vector<bool>& values,
                                                       Encoding::type encoding) {
  if (encoding != Encoding::PLAIN) {
    ParquetException::NYI("only plain encoding currently implemented");
  }

  auto encoder = MakeTypedEncoder<BooleanType>(Encoding::PLAIN, false, d);
  dynamic_cast<BooleanEncoder*>(encoder.get())
      ->Put(values, static_cast<int>(values.size()));
  std::shared_ptr<Buffer> buffer = encoder->FlushValues();
  PARQUET_THROW_NOT_OK(sink_->Write(buffer->data(), buffer->size()));

  num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
  encoding_ = encoding;
  have_values_ = true;
}
Loading ...