Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ include / parquet / bloom_filter.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cmath>
#include <cstdint>
#include <memory>

#include "arrow/util/bit_util.h"
#include "arrow/util/logging.h"
#include "parquet/hasher.h"
#include "parquet/platform.h"
#include "parquet/types.h"

namespace parquet {

// A Bloom filter is a compact structure to indicate whether an item is not in a set or
// probably in a set. The Bloom filter usually consists of a bit set that represents a
// set of elements, a hash strategy and a Bloom filter algorithm.
class PARQUET_EXPORT BloomFilter {
 public:
  // Maximum Bloom filter size, it sets to HDFS default block size 128MB
  // This value will be reconsidered when implementing Bloom filter producer.
  static constexpr uint32_t kMaximumBloomFilterBytes = 128 * 1024 * 1024;

  /// Determine whether an element exist in set or not.
  ///
  /// @param hash the element to contain.
  /// @return false if value is definitely not in set, and true means PROBABLY
  /// in set.
  virtual bool FindHash(uint64_t hash) const = 0;

  /// Insert element to set represented by Bloom filter bitset.
  /// @param hash the hash of value to insert into Bloom filter.
  virtual void InsertHash(uint64_t hash) = 0;

  /// Insert elements to set represented by Bloom filter bitset.
  /// @param hashes the hash values to insert into Bloom filter.
  /// @param num_values the number of hash values to insert.
  virtual void InsertHashes(const uint64_t* hashes, int num_values) = 0;

  /// Write this Bloom filter to an output stream. A Bloom filter structure should
  /// include bitset length, hash strategy, algorithm, and bitset.
  ///
  /// @param sink the output stream to write
  virtual void WriteTo(ArrowOutputStream* sink) const = 0;

  /// Get the number of bytes of bitset
  virtual uint32_t GetBitsetSize() const = 0;

  /// Compute hash for 32 bits value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(int32_t value) const = 0;

  /// Compute hash for 64 bits value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(int64_t value) const = 0;

  /// Compute hash for float value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(float value) const = 0;

  /// Compute hash for double value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(double value) const = 0;

  /// Compute hash for Int96 value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(const Int96* value) const = 0;

  /// Compute hash for ByteArray value by using its plain encoding result.
  ///
  /// @param value the value to hash.
  /// @return hash result.
  virtual uint64_t Hash(const ByteArray* value) const = 0;

  /// Compute hash for fixed byte array value by using its plain encoding result.
  ///
  /// @param value the value address.
  /// @param len the value length.
  /// @return hash result.
  virtual uint64_t Hash(const FLBA* value, uint32_t len) const = 0;

  /// Batch compute hashes for 32 bits values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const int32_t* values, int num_values, uint64_t* hashes) const = 0;

  /// Batch compute hashes for 64 bits values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const int64_t* values, int num_values, uint64_t* hashes) const = 0;

  /// Batch compute hashes for float values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const float* values, int num_values, uint64_t* hashes) const = 0;

  /// Batch compute hashes for double values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const double* values, int num_values, uint64_t* hashes) const = 0;

  /// Batch compute hashes for Int96 values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const Int96* values, int num_values, uint64_t* hashes) const = 0;

  /// Batch compute hashes for ByteArray values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const ByteArray* values, int num_values,
                      uint64_t* hashes) const = 0;

  /// Batch compute hashes for fixed byte array values by using its plain encoding result.
  ///
  /// @param values values a pointer to the values to hash.
  /// @param type_len the value length.
  /// @param num_values the number of values to hash.
  /// @param hashes a pointer to the output hash values, its length should be equal to
  /// num_values.
  virtual void Hashes(const FLBA* values, uint32_t type_len, int num_values,
                      uint64_t* hashes) const = 0;

  virtual ~BloomFilter() = default;

 protected:
  // Hash strategy available for Bloom filter.
  enum class HashStrategy : uint32_t { XXHASH = 0 };

  // Bloom filter algorithm.
  enum class Algorithm : uint32_t { BLOCK = 0 };

  enum class CompressionStrategy : uint32_t { UNCOMPRESSED = 0 };
};

/// The BlockSplitBloomFilter is implemented using block-based Bloom filters from
/// Putze et al.'s "Cache-,Hash- and Space-Efficient Bloom filters". The basic idea is to
/// hash the item to a tiny Bloom filter which size fit a single cache line or smaller.
///
/// This implementation sets 8 bits in each tiny Bloom filter. Each tiny Bloom
/// filter is 32 bytes to take advantage of 32-byte SIMD instructions.
class PARQUET_EXPORT BlockSplitBloomFilter : public BloomFilter {
 public:
  /// The constructor of BlockSplitBloomFilter. It uses XXH64 as hash function.
  ///
  /// \param pool memory pool to use.
  explicit BlockSplitBloomFilter(
      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

  /// Initialize the BlockSplitBloomFilter. The range of num_bytes should be within
  /// [kMinimumBloomFilterBytes, kMaximumBloomFilterBytes], it will be
  /// rounded up/down to lower/upper bound if num_bytes is out of range and also
  /// will be rounded up to a power of 2.
  ///
  /// @param num_bytes The number of bytes to store Bloom filter bitset.
  void Init(uint32_t num_bytes);

  /// Initialize the BlockSplitBloomFilter. It copies the bitset as underlying
  /// bitset because the given bitset may not satisfy the 32-byte alignment requirement
  /// which may lead to segfault when performing SIMD instructions. It is the caller's
  /// responsibility to free the bitset passed in. This is used when reconstructing
  /// a Bloom filter from a parquet file.
  ///
  /// @param bitset The given bitset to initialize the Bloom filter.
  /// @param num_bytes  The number of bytes of given bitset.
  void Init(const uint8_t* bitset, uint32_t num_bytes);

  /// Minimum Bloom filter size, it sets to 32 bytes to fit a tiny Bloom filter.
  static constexpr uint32_t kMinimumBloomFilterBytes = 32;

  /// Calculate optimal size according to the number of distinct values and false
  /// positive probability.
  ///
  /// @param ndv The number of distinct values.
  /// @param fpp The false positive probability.
  /// @return it always return a value between kMinimumBloomFilterBytes and
  /// kMaximumBloomFilterBytes, and the return value is always a power of 2
  static uint32_t OptimalNumOfBytes(uint32_t ndv, double fpp) {
    uint32_t optimal_num_of_bits = OptimalNumOfBits(ndv, fpp);
    ARROW_DCHECK(::arrow::bit_util::IsMultipleOf8(optimal_num_of_bits));
    return optimal_num_of_bits >> 3;
  }

  /// Calculate optimal size according to the number of distinct values and false
  /// positive probability.
  ///
  /// @param ndv The number of distinct values.
  /// @param fpp The false positive probability.
  /// @return it always return a value between kMinimumBloomFilterBytes * 8 and
  /// kMaximumBloomFilterBytes * 8, and the return value is always a power of 16
  static uint32_t OptimalNumOfBits(uint32_t ndv, double fpp) {
    ARROW_DCHECK(fpp > 0.0 && fpp < 1.0);
    const double m = -8.0 * ndv / log(1 - pow(fpp, 1.0 / 8));
    uint32_t num_bits;

    // Handle overflow.
    if (m < 0 || m > kMaximumBloomFilterBytes << 3) {
      num_bits = static_cast<uint32_t>(kMaximumBloomFilterBytes << 3);
    } else {
      num_bits = static_cast<uint32_t>(m);
    }

    // Round up to lower bound
    if (num_bits < kMinimumBloomFilterBytes << 3) {
      num_bits = kMinimumBloomFilterBytes << 3;
    }

    // Get next power of 2 if bits is not power of 2.
    if ((num_bits & (num_bits - 1)) != 0) {
      num_bits = static_cast<uint32_t>(::arrow::bit_util::NextPower2(num_bits));
    }

    // Round down to upper bound
    if (num_bits > kMaximumBloomFilterBytes << 3) {
      num_bits = kMaximumBloomFilterBytes << 3;
    }

    return num_bits;
  }

  bool FindHash(uint64_t hash) const override;
  void InsertHash(uint64_t hash) override;
  void InsertHashes(const uint64_t* hashes, int num_values) override;
  void WriteTo(ArrowOutputStream* sink) const override;
  uint32_t GetBitsetSize() const override { return num_bytes_; }

  uint64_t Hash(int32_t value) const override { return hasher_->Hash(value); }
  uint64_t Hash(int64_t value) const override { return hasher_->Hash(value); }
  uint64_t Hash(float value) const override { return hasher_->Hash(value); }
  uint64_t Hash(double value) const override { return hasher_->Hash(value); }
  uint64_t Hash(const Int96* value) const override { return hasher_->Hash(value); }
  uint64_t Hash(const ByteArray* value) const override { return hasher_->Hash(value); }
  uint64_t Hash(const FLBA* value, uint32_t len) const override {
    return hasher_->Hash(value, len);
  }

  void Hashes(const int32_t* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const int64_t* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const float* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const double* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const Int96* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const ByteArray* values, int num_values, uint64_t* hashes) const override {
    hasher_->Hashes(values, num_values, hashes);
  }
  void Hashes(const FLBA* values, uint32_t type_len, int num_values,
              uint64_t* hashes) const override {
    hasher_->Hashes(values, type_len, num_values, hashes);
  }

  uint64_t Hash(const int32_t* value) const { return hasher_->Hash(*value); }
  uint64_t Hash(const int64_t* value) const { return hasher_->Hash(*value); }
  uint64_t Hash(const float* value) const { return hasher_->Hash(*value); }
  uint64_t Hash(const double* value) const { return hasher_->Hash(*value); }

  /// Deserialize the Bloom filter from an input stream. It is used when reconstructing
  /// a Bloom filter from a parquet filter.
  ///
  /// @param properties The parquet reader properties.
  /// @param input_stream The input stream from which to construct the bloom filter.
  /// @param bloom_filter_length The length of the serialized bloom filter including
  /// header.
  /// @return The BlockSplitBloomFilter.
  static BlockSplitBloomFilter Deserialize(
      const ReaderProperties& properties, ArrowInputStream* input_stream,
      std::optional<int64_t> bloom_filter_length = std::nullopt);

 private:
  inline void InsertHashImpl(uint64_t hash);

  // Bytes in a tiny Bloom filter block.
  static constexpr int kBytesPerFilterBlock = 32;

  // The number of bits to be set in each tiny Bloom filter
  static constexpr int kBitsSetPerBlock = 8;

  // A mask structure used to set bits in each tiny Bloom filter.
  struct BlockMask {
    uint32_t item[kBitsSetPerBlock];
  };

  // The block-based algorithm needs eight odd SALT values to calculate eight indexes
  // of bit to set, one bit in each 32-bit word.
  static constexpr uint32_t SALT[kBitsSetPerBlock] = {
      0x47b6137bU, 0x44974d91U, 0x8824ad5bU, 0xa2b7289dU,
      0x705495c7U, 0x2df1424bU, 0x9efc4947U, 0x5c6bfb31U};

  // Memory pool to allocate aligned buffer for bitset
  ::arrow::MemoryPool* pool_;

  // The underlying buffer of bitset.
  std::shared_ptr<Buffer> data_;
Loading ...