// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <cassert>
#include <cstdint>
#include <memory>
#include <string>
#include <vector>
#include "arrow/type.h" // for DataType, FieldRef, Field and Schema
namespace arrow {
using internal::checked_cast;
namespace acero {
// Identifiers for all different row schemas that are used in a join
//
enum class HashJoinProjection : int {
INPUT = 0,
KEY = 1,
PAYLOAD = 2,
FILTER = 3,
OUTPUT = 4
};
struct SchemaProjectionMap {
static constexpr int kMissingField = -1;
int num_cols;
const int* source_to_base;
const int* base_to_target;
inline int get(int i) const {
assert(i >= 0 && i < num_cols);
assert(source_to_base[i] != kMissingField);
return base_to_target[source_to_base[i]];
}
};
/// Helper class for managing different projections of the same row schema.
/// Used to efficiently map any field in one projection to a corresponding field in
/// another projection.
/// Materialized mappings are generated lazily at the time of the first access.
/// Thread-safe apart from initialization.
template <typename ProjectionIdEnum>
class SchemaProjectionMaps {
public:
static constexpr int kMissingField = -1;
Status Init(ProjectionIdEnum full_schema_handle, const Schema& schema,
const std::vector<ProjectionIdEnum>& projection_handles,
const std::vector<const std::vector<FieldRef>*>& projections) {
assert(projection_handles.size() == projections.size());
ARROW_RETURN_NOT_OK(RegisterSchema(full_schema_handle, schema));
for (size_t i = 0; i < projections.size(); ++i) {
ARROW_RETURN_NOT_OK(
RegisterProjectedSchema(projection_handles[i], *(projections[i]), schema));
}
RegisterEnd();
return Status::OK();
}
int num_cols(ProjectionIdEnum schema_handle) const {
int id = schema_id(schema_handle);
return static_cast<int>(schemas_[id].second.data_types.size());
}
bool is_empty(ProjectionIdEnum schema_handle) const {
return num_cols(schema_handle) == 0;
}
const std::string& field_name(ProjectionIdEnum schema_handle, int field_id) const {
int id = schema_id(schema_handle);
return schemas_[id].second.field_names[field_id];
}
const std::shared_ptr<DataType>& data_type(ProjectionIdEnum schema_handle,
int field_id) const {
int id = schema_id(schema_handle);
return schemas_[id].second.data_types[field_id];
}
const std::vector<std::shared_ptr<DataType>>& data_types(
ProjectionIdEnum schema_handle) const {
int id = schema_id(schema_handle);
return schemas_[id].second.data_types;
}
SchemaProjectionMap map(ProjectionIdEnum from, ProjectionIdEnum to) const {
int id_from = schema_id(from);
int id_to = schema_id(to);
SchemaProjectionMap result;
result.num_cols = num_cols(from);
result.source_to_base = mappings_[id_from].data();
result.base_to_target = inverse_mappings_[id_to].data();
return result;
}
protected:
struct FieldInfos {
std::vector<int> field_paths;
std::vector<std::string> field_names;
std::vector<std::shared_ptr<DataType>> data_types;
};
Status RegisterSchema(ProjectionIdEnum handle, const Schema& schema) {
FieldInfos out_fields;
const FieldVector& in_fields = schema.fields();
out_fields.field_paths.resize(in_fields.size());
out_fields.field_names.resize(in_fields.size());
out_fields.data_types.resize(in_fields.size());
for (size_t i = 0; i < in_fields.size(); ++i) {
const std::string& name = in_fields[i]->name();
const std::shared_ptr<DataType>& type = in_fields[i]->type();
out_fields.field_paths[i] = static_cast<int>(i);
out_fields.field_names[i] = name;
out_fields.data_types[i] = type;
}
schemas_.push_back(std::make_pair(handle, out_fields));
return Status::OK();
}
Status RegisterProjectedSchema(ProjectionIdEnum handle,
const std::vector<FieldRef>& selected_fields,
const Schema& full_schema) {
FieldInfos out_fields;
const FieldVector& in_fields = full_schema.fields();
out_fields.field_paths.resize(selected_fields.size());
out_fields.field_names.resize(selected_fields.size());
out_fields.data_types.resize(selected_fields.size());
for (size_t i = 0; i < selected_fields.size(); ++i) {
// All fields must be found in schema without ambiguity
ARROW_ASSIGN_OR_RAISE(auto match, selected_fields[i].FindOne(full_schema));
const std::string& name = in_fields[match[0]]->name();
const std::shared_ptr<DataType>& type = in_fields[match[0]]->type();
out_fields.field_paths[i] = match[0];
out_fields.field_names[i] = name;
out_fields.data_types[i] = type;
}
schemas_.push_back(std::make_pair(handle, out_fields));
return Status::OK();
}
void RegisterEnd() {
size_t size = schemas_.size();
mappings_.resize(size);
inverse_mappings_.resize(size);
int id_base = 0;
for (size_t i = 0; i < size; ++i) {
GenerateMapForProjection(static_cast<int>(i), id_base);
}
}
int schema_id(ProjectionIdEnum schema_handle) const {
for (size_t i = 0; i < schemas_.size(); ++i) {
if (schemas_[i].first == schema_handle) {
return static_cast<int>(i);
}
}
// We should never get here
assert(false);
return -1;
}
void GenerateMapForProjection(int id_proj, int id_base) {
int num_cols_proj = static_cast<int>(schemas_[id_proj].second.data_types.size());
int num_cols_base = static_cast<int>(schemas_[id_base].second.data_types.size());
std::vector<int>& mapping = mappings_[id_proj];
std::vector<int>& inverse_mapping = inverse_mappings_[id_proj];
mapping.resize(num_cols_proj);
inverse_mapping.resize(num_cols_base);
if (id_proj == id_base) {
for (int i = 0; i < num_cols_base; ++i) {
mapping[i] = inverse_mapping[i] = i;
}
} else {
const FieldInfos& fields_proj = schemas_[id_proj].second;
const FieldInfos& fields_base = schemas_[id_base].second;
for (int i = 0; i < num_cols_base; ++i) {
inverse_mapping[i] = SchemaProjectionMap::kMissingField;
}
for (int i = 0; i < num_cols_proj; ++i) {
int field_id = SchemaProjectionMap::kMissingField;
for (int j = 0; j < num_cols_base; ++j) {
if (fields_proj.field_paths[i] == fields_base.field_paths[j]) {
field_id = j;
// If there are multiple matches for the same input field,
// it will be mapped to the first match.
break;
}
}
assert(field_id != SchemaProjectionMap::kMissingField);
mapping[i] = field_id;
inverse_mapping[field_id] = i;
}
}
}
// vector used as a mapping from ProjectionIdEnum to fields
std::vector<std::pair<ProjectionIdEnum, FieldInfos>> schemas_;
std::vector<std::vector<int>> mappings_;
std::vector<std::vector<int>> inverse_mappings_;
};
using HashJoinProjectionMaps = SchemaProjectionMaps<HashJoinProjection>;
} // namespace acero
} // namespace arrow