Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ include / parquet / stream_writer.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <array>
#include <chrono>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <string_view>
#include <vector>

#include "parquet/column_writer.h"
#include "parquet/file_writer.h"

namespace parquet {

/// \brief A class for writing Parquet files using an output stream type API.
///
/// The values given must be of the correct type i.e. the type must
/// match the file schema exactly otherwise a ParquetException will be
/// thrown.
///
/// The user must explicitly indicate the end of the row using the
/// EndRow() function or EndRow output manipulator.
///
/// A maximum row group size can be configured, the default size is
/// 512MB.  Alternatively the row group size can be set to zero and the
/// user can create new row groups by calling the EndRowGroup()
/// function or using the EndRowGroup output manipulator.
///
/// Required and optional fields are supported:
/// - Required fields are written using operator<<(T)
/// - Optional fields are written using
///   operator<<(std::optional<T>).
///
/// Note that operator<<(T) can be used to write optional fields.
///
/// Similarly, operator<<(std::optional<T>) can be used to
/// write required fields.  However if the optional parameter does not
/// have a value (i.e. it is nullopt) then a ParquetException will be
/// raised.
///
/// Currently there is no support for repeated fields.
///
class PARQUET_EXPORT StreamWriter {
 public:
  template <typename T>
  using optional = ::std::optional<T>;

  // N.B. Default constructed objects are not usable.  This
  //      constructor is provided so that the object may be move
  //      assigned afterwards.
  StreamWriter() = default;

  explicit StreamWriter(std::unique_ptr<ParquetFileWriter> writer);

  ~StreamWriter() = default;

  static void SetDefaultMaxRowGroupSize(int64_t max_size);

  void SetMaxRowGroupSize(int64_t max_size);

  int current_column() const { return column_index_; }

  int64_t current_row() const { return current_row_; }

  int num_columns() const;

  // Moving is possible.
  StreamWriter(StreamWriter&&) = default;
  StreamWriter& operator=(StreamWriter&&) = default;

  // Copying is not allowed.
  StreamWriter(const StreamWriter&) = delete;
  StreamWriter& operator=(const StreamWriter&) = delete;

  /// \brief Output operators for required fields.
  /// These can also be used for optional fields when a value must be set.
  StreamWriter& operator<<(bool v);

  StreamWriter& operator<<(int8_t v);

  StreamWriter& operator<<(uint8_t v);

  StreamWriter& operator<<(int16_t v);

  StreamWriter& operator<<(uint16_t v);

  StreamWriter& operator<<(int32_t v);

  StreamWriter& operator<<(uint32_t v);

  StreamWriter& operator<<(int64_t v);

  StreamWriter& operator<<(uint64_t v);

  StreamWriter& operator<<(const std::chrono::milliseconds& v);

  StreamWriter& operator<<(const std::chrono::microseconds& v);

  StreamWriter& operator<<(float v);

  StreamWriter& operator<<(double v);

  StreamWriter& operator<<(char v);

  /// \brief Helper class to write fixed length strings.
  /// This is useful as the standard string view (such as
  /// std::string_view) is for variable length data.
  struct PARQUET_EXPORT FixedStringView {
    FixedStringView() = default;

    explicit FixedStringView(const char* data_ptr);

    FixedStringView(const char* data_ptr, std::size_t data_len);

    const char* data{NULLPTR};
    std::size_t size{0};
  };

  /// \brief Output operators for fixed length strings.
  template <int N>
  StreamWriter& operator<<(const char (&v)[N]) {
    return WriteFixedLength(v, N);
  }
  template <std::size_t N>
  StreamWriter& operator<<(const std::array<char, N>& v) {
    return WriteFixedLength(v.data(), N);
  }
  StreamWriter& operator<<(FixedStringView v);

  /// \brief Output operators for variable length strings.
  StreamWriter& operator<<(const char* v);
  StreamWriter& operator<<(const std::string& v);
  StreamWriter& operator<<(::std::string_view v);

  /// \brief Output operator for optional fields.
  template <typename T>
  StreamWriter& operator<<(const optional<T>& v) {
    if (v) {
      return operator<<(*v);
    }
    SkipOptionalColumn();
    return *this;
  }

  /// \brief Skip the next N columns of optional data.  If there are
  /// less than N columns remaining then the excess columns are
  /// ignored.
  /// \throws ParquetException if there is an attempt to skip any
  /// required column.
  /// \return Number of columns actually skipped.
  int64_t SkipColumns(int num_columns_to_skip);

  /// \brief Terminate the current row and advance to next one.
  /// \throws ParquetException if all columns in the row were not
  /// written or skipped.
  void EndRow();

  /// \brief Terminate the current row group and create new one.
  void EndRowGroup();

 protected:
  template <typename WriterType, typename T>
  StreamWriter& Write(const T v) {
    auto writer = static_cast<WriterType*>(row_group_writer_->column(column_index_++));

    writer->WriteBatch(kBatchSizeOne, &kDefLevelOne, &kRepLevelZero, &v);

    if (max_row_group_size_ > 0) {
      row_group_size_ += writer->estimated_buffered_value_bytes();
    }
    return *this;
  }

  StreamWriter& WriteVariableLength(const char* data_ptr, std::size_t data_len);

  StreamWriter& WriteFixedLength(const char* data_ptr, std::size_t data_len);

  void CheckColumn(Type::type physical_type, ConvertedType::type converted_type,
                   int length = -1);

  /// \brief Skip the next column which must be optional.
  /// \throws ParquetException if the next column does not exist or is
  /// not optional.
  void SkipOptionalColumn();

  void WriteNullValue(ColumnWriter* writer);

 private:
  using node_ptr_type = std::shared_ptr<schema::PrimitiveNode>;

  struct null_deleter {
    void operator()(void*) {}
  };

  int32_t column_index_{0};
  int64_t current_row_{0};
  int64_t row_group_size_{0};
  int64_t max_row_group_size_{default_row_group_size_};

  std::unique_ptr<ParquetFileWriter> file_writer_;
  std::unique_ptr<RowGroupWriter, null_deleter> row_group_writer_;
  std::vector<node_ptr_type> nodes_;

  static constexpr int16_t kDefLevelZero = 0;
  static constexpr int16_t kDefLevelOne = 1;
  static constexpr int16_t kRepLevelZero = 0;
  static constexpr int64_t kBatchSizeOne = 1;

  static int64_t default_row_group_size_;
};

struct PARQUET_EXPORT EndRowType {};
constexpr EndRowType EndRow = {};

struct PARQUET_EXPORT EndRowGroupType {};
constexpr EndRowGroupType EndRowGroup = {};

PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowType);

PARQUET_EXPORT
StreamWriter& operator<<(StreamWriter&, EndRowGroupType);

}  // namespace parquet