// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
// This API is EXPERIMENTAL.
#pragma once
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
#include "arrow/compute/expression.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/util/async_generator_fwd.h"
#include "arrow/util/future.h"
#include "arrow/util/macros.h"
#include "arrow/util/mutex.h"
namespace arrow {
namespace internal {
class Executor;
} // namespace internal
namespace dataset {
using RecordBatchGenerator = std::function<Future<std::shared_ptr<RecordBatch>>()>;
/// \brief Description of a column to scan
struct ARROW_DS_EXPORT FragmentSelectionColumn {
/// \brief The path to the column to load
FieldPath path;
/// \brief The type of the column in the dataset schema
///
/// A format may choose to ignore this field completely. For example, when
/// reading from IPC the reader can just return the column in the data type
/// that is stored on disk. There is no point in doing anything special.
///
/// However, some formats may be capable of casting on the fly. For example,
/// when reading from CSV, if we know the target type of the column, we can
/// convert from string to the target type as we read.
DataType* requested_type;
};
/// \brief A list of columns that should be loaded from a fragment
///
/// The paths in this selection should be referring to the fragment schema. This class
/// contains a virtual destructor as it is expected evolution strategies will need to
/// extend this to add any information needed to later evolve the batches.
///
/// For example, in the basic evolution strategy, we keep track of which columns
/// were missing from the file so that we can fill those in with null when evolving.
class ARROW_DS_EXPORT FragmentSelection {
public:
explicit FragmentSelection(std::vector<FragmentSelectionColumn> columns)
: columns_(std::move(columns)) {}
virtual ~FragmentSelection() = default;
/// The columns that should be loaded from the fragment
const std::vector<FragmentSelectionColumn>& columns() const { return columns_; }
private:
std::vector<FragmentSelectionColumn> columns_;
};
/// \brief Instructions for scanning a particular fragment
///
/// The fragment scan request is derived from ScanV2Options. The main
/// difference is that the scan options are based on the dataset schema
/// while the fragment request is based on the fragment schema.
struct ARROW_DS_EXPORT FragmentScanRequest {
/// \brief A row filter
///
/// The filter expression should be written against the fragment schema.
///
/// \see ScanV2Options for details on how this filter should be applied
compute::Expression filter = compute::literal(true);
/// \brief The columns to scan
///
/// These indices refer to the fragment schema
///
/// Note: This is NOT a simple list of top-level column indices.
/// For more details \see ScanV2Options
///
/// If possible a fragment should only read from disk the data needed
/// to satisfy these columns. If a format cannot partially read a nested
/// column (e.g. JSON) then it must apply the column selection (in memory)
/// before returning the scanned batch.
std::shared_ptr<FragmentSelection> fragment_selection;
/// \brief Options specific to the format being scanned
const FragmentScanOptions* format_scan_options;
};
/// \brief An iterator-like object that can yield batches created from a fragment
class ARROW_DS_EXPORT FragmentScanner {
public:
/// This instance will only be destroyed after all ongoing scan futures
/// have been completed.
///
/// This means any callbacks created as part of the scan can safely
/// capture `this`
virtual ~FragmentScanner() = default;
/// \brief Scan a batch of data from the file
/// \param batch_number The index of the batch to read
virtual Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) = 0;
/// \brief Calculate an estimate of how many data bytes the given batch will represent
///
/// "Data bytes" should be the total size of all the buffers once the data has been
/// decoded into the Arrow format.
virtual int64_t EstimatedDataBytes(int batch_number) = 0;
/// \brief The number of batches in the fragment to scan
virtual int NumBatches() = 0;
};
/// \brief Information learned about a fragment through inspection
///
/// This information can be used to figure out which fields need
/// to be read from a file and how the data read in should be evolved
/// to match the dataset schema.
///
/// For example, from a CSV file we can inspect and learn the column
/// names and use those column names to determine which columns to load
/// from the CSV file.
struct ARROW_DS_EXPORT InspectedFragment {
explicit InspectedFragment(std::vector<std::string> column_names)
: column_names(std::move(column_names)) {}
std::vector<std::string> column_names;
};
/// \brief A granular piece of a Dataset, such as an individual file.
///
/// A Fragment can be read/scanned separately from other fragments. It yields a
/// collection of RecordBatches when scanned
///
/// Note that Fragments have well defined physical schemas which are reconciled by
/// the Datasets which contain them; these physical schemas may differ from a parent
/// Dataset's schema and the physical schemas of sibling Fragments.
class ARROW_DS_EXPORT Fragment : public std::enable_shared_from_this<Fragment> {
public:
/// \brief An expression that represents no known partition information
static const compute::Expression kNoPartitionInformation;
/// \brief Return the physical schema of the Fragment.
///
/// The physical schema is also called the writer schema.
/// This method is blocking and may suffer from high latency filesystem.
/// The schema is cached after being read once, or may be specified at construction.
Result<std::shared_ptr<Schema>> ReadPhysicalSchema();
/// An asynchronous version of Scan
virtual Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) = 0;
/// \brief Inspect a fragment to learn basic information
///
/// This will be called before a scan and a fragment should attach whatever
/// information will be needed to figure out an evolution strategy. This information
/// will then be passed to the call to BeginScan
virtual Future<std::shared_ptr<InspectedFragment>> InspectFragment(
const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
/// \brief Start a scan operation
virtual Future<std::shared_ptr<FragmentScanner>> BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options, compute::ExecContext* exec_context);
/// \brief Count the number of rows in this fragment matching the filter using metadata
/// only. That is, this method may perform I/O, but will not load data.
///
/// If this is not possible, resolve with an empty optional. The fragment can perform
/// I/O (e.g. to read metadata) before it deciding whether it can satisfy the request.
virtual Future<std::optional<int64_t>> CountRows(
compute::Expression predicate, const std::shared_ptr<ScanOptions>& options);
virtual std::string type_name() const = 0;
virtual std::string ToString() const { return type_name(); }
/// \brief An expression which evaluates to true for all data viewed by this
/// Fragment.
const compute::Expression& partition_expression() const {
return partition_expression_;
}
virtual ~Fragment() = default;
protected:
Fragment() = default;
explicit Fragment(compute::Expression partition_expression,
std::shared_ptr<Schema> physical_schema);
virtual Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() = 0;
util::Mutex physical_schema_mutex_;
compute::Expression partition_expression_ = compute::literal(true);
std::shared_ptr<Schema> physical_schema_;
};
/// \brief Per-scan options for fragment(s) in a dataset.
///
/// These options are not intrinsic to the format or fragment itself, but do affect
/// the results of a scan. These are options which make sense to change between
/// repeated reads of the same dataset, such as format-specific conversion options
/// (that do not affect the schema).
///
/// \ingroup dataset-scanning
class ARROW_DS_EXPORT FragmentScanOptions {
public:
virtual std::string type_name() const = 0;
virtual std::string ToString() const { return type_name(); }
virtual ~FragmentScanOptions() = default;
};
/// \defgroup dataset-implementations Concrete implementations
///
/// @{
/// \brief A trivial Fragment that yields ScanTask out of a fixed set of
/// RecordBatch.
class ARROW_DS_EXPORT InMemoryFragment : public Fragment {
public:
class Scanner;
InMemoryFragment(std::shared_ptr<Schema> schema, RecordBatchVector record_batches,
compute::Expression = compute::literal(true));
explicit InMemoryFragment(RecordBatchVector record_batches,
compute::Expression = compute::literal(true));
Result<RecordBatchGenerator> ScanBatchesAsync(
const std::shared_ptr<ScanOptions>& options) override;
Future<std::optional<int64_t>> CountRows(
compute::Expression predicate,
const std::shared_ptr<ScanOptions>& options) override;
Future<std::shared_ptr<InspectedFragment>> InspectFragment(
const FragmentScanOptions* format_options,
compute::ExecContext* exec_context) override;
Future<std::shared_ptr<FragmentScanner>> BeginScan(
const FragmentScanRequest& request, const InspectedFragment& inspected_fragment,
const FragmentScanOptions* format_options,
compute::ExecContext* exec_context) override;
std::string type_name() const override { return "in-memory"; }
protected:
Result<std::shared_ptr<Schema>> ReadPhysicalSchemaImpl() override;
RecordBatchVector record_batches_;
};
/// @}
using FragmentGenerator = AsyncGenerator<std::shared_ptr<Fragment>>;
/// \brief Rules for converting the dataset schema to and from fragment schemas
class ARROW_DS_EXPORT FragmentEvolutionStrategy {
public:
/// This instance will only be destroyed when all scan operations for the
/// fragment have completed.
virtual ~FragmentEvolutionStrategy() = default;
/// \brief A guarantee that applies to all batches of this fragment
///
/// For example, if a fragment is missing one of the fields in the dataset
/// schema then a typical evolution strategy is to set that field to null.
///
/// So if the column at index 3 is missing then the guarantee is
/// FieldRef(3) == null
///
/// Individual field guarantees should be AND'd together and returned
/// as a single expression.
virtual Result<compute::Expression> GetGuarantee(
const std::vector<FieldPath>& dataset_schema_selection) const = 0;
/// \brief Return a fragment schema selection given a dataset schema selection
///
/// For example, if the user wants fields 2 & 4 of the dataset schema and
/// in this fragment the field 2 is missing and the field 4 is at index 1 then
/// this should return {1}
virtual Result<std::unique_ptr<FragmentSelection>> DevolveSelection(
const std::vector<FieldPath>& dataset_schema_selection) const = 0;
/// \brief Return a filter expression bound to the fragment schema given
/// a filter expression bound to the dataset schema
///
/// The dataset scan filter will first be simplified by the guarantee returned
/// by GetGuarantee. This means an evolution that only handles dropping or casting
/// fields doesn't need to do anything here except return the given filter.
///
/// On the other hand, an evolution that is doing some kind of aliasing will likely
/// need to convert field references in the filter to the aliased field references
/// where appropriate.
virtual Result<compute::Expression> DevolveFilter(
const compute::Expression& filter) const = 0;
/// \brief Convert a batch from the fragment schema to the dataset schema
///
/// Typically this involves casting columns from the data type stored on disk
/// to the data type of the dataset schema. For example, this fragment might
/// have columns stored as int32 and the dataset schema might have int64 for
/// the column. In this case we should cast the column from int32 to int64.
///
/// Note: A fragment may perform this cast as the data is read from disk. In
/// that case a cast might not be needed.
virtual Result<compute::ExecBatch> EvolveBatch(
const std::shared_ptr<RecordBatch>& batch,
const std::vector<FieldPath>& dataset_selection,
const FragmentSelection& selection) const = 0;
/// \brief Return a string description of this strategy
virtual std::string ToString() const = 0;
};
/// \brief Lookup to create a FragmentEvolutionStrategy for a given fragment
class ARROW_DS_EXPORT DatasetEvolutionStrategy {
public:
virtual ~DatasetEvolutionStrategy() = default;
/// \brief Create a strategy for evolving from the given fragment
/// to the schema of the given dataset
virtual std::unique_ptr<FragmentEvolutionStrategy> GetStrategy(
const Dataset& dataset, const Fragment& fragment,
const InspectedFragment& inspected_fragment) = 0;
/// \brief Return a string description of this strategy
virtual std::string ToString() const = 0;
};
ARROW_DS_EXPORT std::unique_ptr<DatasetEvolutionStrategy>
MakeBasicDatasetEvolutionStrategy();
Loading ...