// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <memory>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "arrow/io/caching.h"
#include "arrow/type.h"
#include "arrow/util/compression.h"
#include "arrow/util/type_fwd.h"
#include "parquet/encryption/encryption.h"
#include "parquet/exception.h"
#include "parquet/parquet_version.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
#include "parquet/type_fwd.h"
#include "parquet/types.h"
namespace parquet {
/// Controls serialization format of data pages. parquet-format v2.0.0
/// introduced a new data page metadata type DataPageV2 and serialized page
/// structure (for example, encoded levels are no longer compressed). Prior to
/// the completion of PARQUET-457 in 2020, this library did not implement
/// DataPageV2 correctly, so if you use the V2 data page format, you may have
/// forward compatibility issues (older versions of the library will be unable
/// to read the files). Note that some Parquet implementations do not implement
/// DataPageV2 at all.
enum class ParquetDataPageVersion { V1, V2 };
/// Align the default buffer size to a small multiple of a page size.
constexpr int64_t kDefaultBufferSize = 4096 * 4;
constexpr int32_t kDefaultThriftStringSizeLimit = 100 * 1000 * 1000;
// Structs in the thrift definition are relatively large (at least 300 bytes).
// This limits total memory to the same order of magnitude as
// kDefaultStringSizeLimit.
constexpr int32_t kDefaultThriftContainerSizeLimit = 1000 * 1000;
class PARQUET_EXPORT ReaderProperties {
public:
explicit ReaderProperties(MemoryPool* pool = ::arrow::default_memory_pool())
: pool_(pool) {}
MemoryPool* memory_pool() const { return pool_; }
std::shared_ptr<ArrowInputStream> GetStream(std::shared_ptr<ArrowInputFile> source,
int64_t start, int64_t num_bytes);
/// Buffered stream reading allows the user to control the memory usage of
/// parquet readers. This ensure that all `RandomAccessFile::ReadAt` calls are
/// wrapped in a buffered reader that uses a fix sized buffer (of size
/// `buffer_size()`) instead of the full size of the ReadAt.
///
/// The primary reason for this control knobs is for resource control and not
/// performance.
bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
/// Enable buffered stream reading.
void enable_buffered_stream() { buffered_stream_enabled_ = true; }
/// Disable buffered stream reading.
void disable_buffered_stream() { buffered_stream_enabled_ = false; }
bool read_dense_for_nullable() const { return read_dense_for_nullable_; }
void enable_read_dense_for_nullable() { read_dense_for_nullable_ = true; }
void disable_read_dense_for_nullable() { read_dense_for_nullable_ = false; }
/// Return the size of the buffered stream buffer.
int64_t buffer_size() const { return buffer_size_; }
/// Set the size of the buffered stream buffer in bytes.
void set_buffer_size(int64_t size) { buffer_size_ = size; }
/// \brief Return the size limit on thrift strings.
///
/// This limit helps prevent space and time bombs in files, but may need to
/// be increased in order to read files with especially large headers.
int32_t thrift_string_size_limit() const { return thrift_string_size_limit_; }
/// Set the size limit on thrift strings.
void set_thrift_string_size_limit(int32_t size) { thrift_string_size_limit_ = size; }
/// \brief Return the size limit on thrift containers.
///
/// This limit helps prevent space and time bombs in files, but may need to
/// be increased in order to read files with especially large headers.
int32_t thrift_container_size_limit() const { return thrift_container_size_limit_; }
/// Set the size limit on thrift containers.
void set_thrift_container_size_limit(int32_t size) {
thrift_container_size_limit_ = size;
}
/// Set the decryption properties.
void file_decryption_properties(std::shared_ptr<FileDecryptionProperties> decryption) {
file_decryption_properties_ = std::move(decryption);
}
/// Return the decryption properties.
const std::shared_ptr<FileDecryptionProperties>& file_decryption_properties() const {
return file_decryption_properties_;
}
bool page_checksum_verification() const { return page_checksum_verification_; }
void set_page_checksum_verification(bool check_crc) {
page_checksum_verification_ = check_crc;
}
private:
MemoryPool* pool_;
int64_t buffer_size_ = kDefaultBufferSize;
int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit;
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
bool buffered_stream_enabled_ = false;
bool page_checksum_verification_ = false;
// Used with a RecordReader.
bool read_dense_for_nullable_ = false;
std::shared_ptr<FileDecryptionProperties> file_decryption_properties_;
};
ReaderProperties PARQUET_EXPORT default_reader_properties();
static constexpr int64_t kDefaultDataPageSize = 1024 * 1024;
static constexpr bool DEFAULT_IS_DICTIONARY_ENABLED = true;
static constexpr int64_t DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT = kDefaultDataPageSize;
static constexpr int64_t DEFAULT_WRITE_BATCH_SIZE = 1024;
static constexpr int64_t DEFAULT_MAX_ROW_GROUP_LENGTH = 1024 * 1024;
static constexpr bool DEFAULT_ARE_STATISTICS_ENABLED = true;
static constexpr int64_t DEFAULT_MAX_STATISTICS_SIZE = 4096;
static constexpr Encoding::type DEFAULT_ENCODING = Encoding::UNKNOWN;
static const char DEFAULT_CREATED_BY[] = CREATED_BY_VERSION;
static constexpr Compression::type DEFAULT_COMPRESSION_TYPE = Compression::UNCOMPRESSED;
static constexpr bool DEFAULT_IS_PAGE_INDEX_ENABLED = false;
class PARQUET_EXPORT ColumnProperties {
public:
ColumnProperties(Encoding::type encoding = DEFAULT_ENCODING,
Compression::type codec = DEFAULT_COMPRESSION_TYPE,
bool dictionary_enabled = DEFAULT_IS_DICTIONARY_ENABLED,
bool statistics_enabled = DEFAULT_ARE_STATISTICS_ENABLED,
size_t max_stats_size = DEFAULT_MAX_STATISTICS_SIZE,
bool page_index_enabled = DEFAULT_IS_PAGE_INDEX_ENABLED)
: encoding_(encoding),
codec_(codec),
dictionary_enabled_(dictionary_enabled),
statistics_enabled_(statistics_enabled),
max_stats_size_(max_stats_size),
page_index_enabled_(page_index_enabled) {}
void set_encoding(Encoding::type encoding) { encoding_ = encoding; }
void set_compression(Compression::type codec) { codec_ = codec; }
void set_dictionary_enabled(bool dictionary_enabled) {
dictionary_enabled_ = dictionary_enabled;
}
void set_statistics_enabled(bool statistics_enabled) {
statistics_enabled_ = statistics_enabled;
}
void set_max_statistics_size(size_t max_stats_size) {
max_stats_size_ = max_stats_size;
}
void set_compression_level(int compression_level) {
if (!codec_options_) {
codec_options_ = std::make_shared<CodecOptions>();
}
codec_options_->compression_level = compression_level;
}
void set_codec_options(const std::shared_ptr<CodecOptions>& codec_options) {
codec_options_ = codec_options;
}
void set_page_index_enabled(bool page_index_enabled) {
page_index_enabled_ = page_index_enabled;
}
Encoding::type encoding() const { return encoding_; }
Compression::type compression() const { return codec_; }
bool dictionary_enabled() const { return dictionary_enabled_; }
bool statistics_enabled() const { return statistics_enabled_; }
size_t max_statistics_size() const { return max_stats_size_; }
int compression_level() const {
if (!codec_options_) {
return ::arrow::util::kUseDefaultCompressionLevel;
}
return codec_options_->compression_level;
}
const std::shared_ptr<CodecOptions>& codec_options() const { return codec_options_; }
bool page_index_enabled() const { return page_index_enabled_; }
private:
Encoding::type encoding_;
Compression::type codec_;
bool dictionary_enabled_;
bool statistics_enabled_;
size_t max_stats_size_;
std::shared_ptr<CodecOptions> codec_options_;
bool page_index_enabled_;
};
class PARQUET_EXPORT WriterProperties {
public:
class Builder {
public:
Builder()
: pool_(::arrow::default_memory_pool()),
dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
max_row_group_length_(DEFAULT_MAX_ROW_GROUP_LENGTH),
pagesize_(kDefaultDataPageSize),
version_(ParquetVersion::PARQUET_2_6),
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
page_checksum_enabled_(false) {}
explicit Builder(const WriterProperties& properties)
: pool_(properties.memory_pool()),
dictionary_pagesize_limit_(properties.dictionary_pagesize_limit()),
write_batch_size_(properties.write_batch_size()),
max_row_group_length_(properties.max_row_group_length()),
pagesize_(properties.data_pagesize()),
version_(properties.version()),
data_page_version_(properties.data_page_version()),
created_by_(properties.created_by()),
store_decimal_as_integer_(properties.store_decimal_as_integer()),
page_checksum_enabled_(properties.page_checksum_enabled()),
sorting_columns_(properties.sorting_columns()),
default_column_properties_(properties.default_column_properties()) {}
virtual ~Builder() {}
/// Specify the memory pool for the writer. Default default_memory_pool.
Builder* memory_pool(MemoryPool* pool) {
pool_ = pool;
return this;
}
/// Enable dictionary encoding in general for all columns. Default
/// enabled.
Builder* enable_dictionary() {
default_column_properties_.set_dictionary_enabled(true);
return this;
}
/// Disable dictionary encoding in general for all columns. Default
/// enabled.
Builder* disable_dictionary() {
default_column_properties_.set_dictionary_enabled(false);
return this;
}
/// Enable dictionary encoding for column specified by `path`. Default
/// enabled.
Builder* enable_dictionary(const std::string& path) {
dictionary_enabled_[path] = true;
return this;
}
/// Enable dictionary encoding for column specified by `path`. Default
/// enabled.
Builder* enable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
return this->enable_dictionary(path->ToDotString());
}
/// Disable dictionary encoding for column specified by `path`. Default
/// enabled.
Builder* disable_dictionary(const std::string& path) {
dictionary_enabled_[path] = false;
return this;
}
/// Disable dictionary encoding for column specified by `path`. Default
/// enabled.
Builder* disable_dictionary(const std::shared_ptr<schema::ColumnPath>& path) {
return this->disable_dictionary(path->ToDotString());
}
/// Specify the dictionary page size limit per row group. Default 1MB.
Builder* dictionary_pagesize_limit(int64_t dictionary_psize_limit) {
dictionary_pagesize_limit_ = dictionary_psize_limit;
return this;
}
/// Specify the write batch size while writing batches of Arrow values
/// into Parquet. Default 1024.
Builder* write_batch_size(int64_t write_batch_size) {
write_batch_size_ = write_batch_size;
return this;
}
/// Specify the max number of rows to put in a single row group.
/// Default 1Mi rows.
Builder* max_row_group_length(int64_t max_row_group_length) {
max_row_group_length_ = max_row_group_length;
return this;
}
/// Specify the data page size.
/// Default 1MB.
Builder* data_pagesize(int64_t pg_size) {
pagesize_ = pg_size;
return this;
}
/// Specify the data page version.
/// Default V1.
Builder* data_page_version(ParquetDataPageVersion data_page_version) {
data_page_version_ = data_page_version;
return this;
}
/// Specify the Parquet file version.
/// Default PARQUET_2_6.
Builder* version(ParquetVersion::type version) {
version_ = version;
return this;
}
Builder* created_by(const std::string& created_by) {
Loading ...