Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ include / arrow / acero / schema_util.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <cassert>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>

#include "arrow/type.h"  // for DataType, FieldRef, Field and Schema

namespace arrow {

using internal::checked_cast;

namespace acero {

// Identifiers for all different row schemas that are used in a join
//
enum class HashJoinProjection : int {
  INPUT = 0,
  KEY = 1,
  PAYLOAD = 2,
  FILTER = 3,
  OUTPUT = 4
};

struct SchemaProjectionMap {
  static constexpr int kMissingField = -1;
  int num_cols;
  const int* source_to_base;
  const int* base_to_target;
  inline int get(int i) const {
    assert(i >= 0 && i < num_cols);
    assert(source_to_base[i] != kMissingField);
    return base_to_target[source_to_base[i]];
  }
};

/// Helper class for managing different projections of the same row schema.
/// Used to efficiently map any field in one projection to a corresponding field in
/// another projection.
/// Materialized mappings are generated lazily at the time of the first access.
/// Thread-safe apart from initialization.
template <typename ProjectionIdEnum>
class SchemaProjectionMaps {
 public:
  static constexpr int kMissingField = -1;

  Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema,
              const std::vector<ProjectionIdEnum>& projection_handles,
              const std::vector<const std::vector<FieldRef>*>& projections) {
    assert(projection_handles.size() == projections.size());
    ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema));
    for (size_t i = 0; i < projections.size(); ++i) {
      ARROW_RETURN_NOT_OK(
          RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema));
    }
    RegisterEnd();
    return Status::OK();
  }

  int num_cols(ProjectionIdEnum schema_handle) const {
    int id = schema_id(schema_handle);
    return static_cast<int>(schemas_[id].second.data_types.size());
  }

  bool is_empty(ProjectionIdEnum schema_handle) const {
    return num_cols(schema_handle) == 0;
  }

  const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const {
    int id = schema_id(schema_handle);
    return schemas_[id].second.field_names[field_id];
  }

  const std::shared_ptr<DataType>& data_type(ProjectionIdEnum schema_handle,
                                             int field_id) const {
    int id = schema_id(schema_handle);
    return schemas_[id].second.data_types[field_id];
  }

  const std::vector<std::shared_ptr<DataType>>& data_types(
      ProjectionIdEnum schema_handle) const {
    int id = schema_id(schema_handle);
    return schemas_[id].second.data_types;
  }

  SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const {
    int id_from = schema_id(from);
    int id_to = schema_id(to);
    SchemaProjectionMap result;
    result.num_cols = num_cols(from);
    result.source_to_base = mappings_[id_from].data();
    result.base_to_target = inverse_mappings_[id_to].data();
    return result;
  }

 protected:
  struct FieldInfos {
    std::vector<int> field_paths;
    std::vector<std::string> field_names;
    std::vector<std::shared_ptr<DataType>> data_types;
  };

  Status RegisterSchema(ProjectionIdEnum handle, const Schema& schema) {
    FieldInfos out_fields;
    const FieldVector& in_fields = schema.fields();
    out_fields.field_paths.resize(in_fields.size());
    out_fields.field_names.resize(in_fields.size());
    out_fields.data_types.resize(in_fields.size());
    for (size_t i = 0; i < in_fields.size(); ++i) {
      const std::string& name = in_fields[i]->name();
      const std::shared_ptr<DataType>& type = in_fields[i]->type();
      out_fields.field_paths[i] = static_cast<int>(i);
      out_fields.field_names[i] = name;
      out_fields.data_types[i] = type;
    }
    schemas_.push_back(std::make_pair(handle, out_fields));
    return Status::OK();
  }

  Status RegisterProjectedSchema(ProjectionIdEnum handle,
                                 const std::vector<FieldRef>& selected_fields,
                                 const Schema& full_schema) {
    FieldInfos out_fields;
    const FieldVector& in_fields = full_schema.fields();
    out_fields.field_paths.resize(selected_fields.size());
    out_fields.field_names.resize(selected_fields.size());
    out_fields.data_types.resize(selected_fields.size());
    for (size_t i = 0; i < selected_fields.size(); ++i) {
      // All fields must be found in schema without ambiguity
      ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema));
      const std::string& name = in_fields[match[0]]->name();
      const std::shared_ptr<DataType>& type = in_fields[match[0]]->type();
      out_fields.field_paths[i] = match[0];
      out_fields.field_names[i] = name;
      out_fields.data_types[i] = type;
    }
    schemas_.push_back(std::make_pair(handle, out_fields));
    return Status::OK();
  }

  void RegisterEnd() {
    size_t size = schemas_.size();
    mappings_.resize(size);
    inverse_mappings_.resize(size);
    int id_base = 0;
    for (size_t i = 0; i < size; ++i) {
      GenerateMapForProjection(static_cast<int>(i), id_base);
    }
  }

  int schema_id(ProjectionIdEnum schema_handle) const {
    for (size_t i = 0; i < schemas_.size(); ++i) {
      if (schemas_[i].first == schema_handle) {
        return static_cast<int>(i);
      }
    }
    // We should never get here
    assert(false);
    return -1;
  }

  void GenerateMapForProjection(int id_proj, int id_base) {
    int num_cols_proj = static_cast<int>(schemas_[id_proj].second.data_types.size());
    int num_cols_base = static_cast<int>(schemas_[id_base].second.data_types.size());

    std::vector<int>& mapping = mappings_[id_proj];
    std::vector<int>& inverse_mapping = inverse_mappings_[id_proj];
    mapping.resize(num_cols_proj);
    inverse_mapping.resize(num_cols_base);

    if (id_proj == id_base) {
      for (int i = 0; i < num_cols_base; ++i) {
        mapping[i] = inverse_mapping[i] = i;
      }
    } else {
      const FieldInfos& fields_proj = schemas_[id_proj].second;
      const FieldInfos& fields_base = schemas_[id_base].second;
      for (int i = 0; i < num_cols_base; ++i) {
        inverse_mapping[i] = SchemaProjectionMap::kMissingField;
      }
      for (int i = 0; i < num_cols_proj; ++i) {
        int field_id = SchemaProjectionMap::kMissingField;
        for (int j = 0; j < num_cols_base; ++j) {
          if (fields_proj.field_paths[i] == fields_base.field_paths[j]) {
            field_id = j;
            // If there are multiple matches for the same input field,
            // it will be mapped to the first match.
            break;
          }
        }
        assert(field_id != SchemaProjectionMap::kMissingField);
        mapping[i] = field_id;
        inverse_mapping[field_id] = i;
      }
    }
  }

  // vector used as a mapping from ProjectionIdEnum to fields
  std::vector<std::pair<ProjectionIdEnum, FieldInfos>> schemas_;
  std::vector<std::vector<int>> mappings_;
  std::vector<std::vector<int>> inverse_mappings_;
};

using HashJoinProjectionMaps = SchemaProjectionMaps<HashJoinProjection>;

}  // namespace acero
}  // namespace arrow