// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <vector>
#include "arrow/acero/type_fwd.h"
#include "arrow/acero/visibility.h"
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/expression.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"
namespace arrow {
using compute::Aggregate;
using compute::ExecBatch;
using compute::Expression;
using compute::literal;
using compute::Ordering;
using compute::SelectKOptions;
using compute::SortOptions;
namespace internal {
class Executor;
} // namespace internal
namespace acero {
/// \brief This must not be used in release-mode
struct DebugOptions;
using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;
/// \addtogroup acero-nodes
/// @{
/// \brief A base class for all options objects
///
/// The only time this is used directly is when a node has no configuration
class ARROW_ACERO_EXPORT ExecNodeOptions {
public:
virtual ~ExecNodeOptions() = default;
/// \brief This must not be used in release-mode
std::shared_ptr<DebugOptions> debug_opts;
};
/// \brief A node representing a generic source of data for Acero
///
/// The source node will start calling `generator` during StartProducing. An initial
/// task will be created that will call `generator`. It will not call `generator`
/// reentrantly. If the source can be read in parallel then those details should be
/// encapsulated within `generator`.
///
/// For each batch received a new task will be created to push that batch downstream.
/// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the
/// parent batch and call InputReceived. Thus, if the `generator` yields a large
/// batch it may result in several calls to InputReceived.
///
/// The SourceNode will, by default, assign an implicit ordering to outgoing batches.
/// This is valid as long as the generator generates batches in a deterministic fashion.
/// Currently, the only way to override this is to subclass the SourceNode.
///
/// This node is not generally used directly but can serve as the basis for various
/// specialized nodes.
class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
public:
/// Create an instance from values
SourceNodeOptions(std::shared_ptr<Schema> output_schema,
std::function<Future<std::optional<ExecBatch>>()> generator,
Ordering ordering = Ordering::Unordered())
: output_schema(std::move(output_schema)),
generator(std::move(generator)),
ordering(std::move(ordering)) {}
/// \brief the schema for batches that will be generated by this source
std::shared_ptr<Schema> output_schema;
/// \brief an asynchronous stream of batches ending with std::nullopt
std::function<Future<std::optional<ExecBatch>>()> generator;
Ordering ordering = Ordering::Unordered();
};
/// \brief a node that generates data from a table already loaded in memory
///
/// The table source node will slice off chunks, defined by `max_batch_size`
/// for parallel processing. The table source node extends source node and so these
/// chunks will be iteratively processed in small batches. \see SourceNodeOptions
/// for details.
class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
public:
static constexpr int64_t kDefaultMaxBatchSize = 1 << 20;
/// Create an instance from values
TableSourceNodeOptions(std::shared_ptr<Table> table,
int64_t max_batch_size = kDefaultMaxBatchSize)
: table(std::move(table)), max_batch_size(max_batch_size) {}
/// \brief a table which acts as the data source
std::shared_ptr<Table> table;
/// \brief size of batches to emit from this node
/// If the table is larger the node will emit multiple batches from the
/// the table to be processed in parallel.
int64_t max_batch_size;
};
/// \brief define a lazily resolved Arrow table.
///
/// The table uniquely identified by the names can typically be resolved at the time when
/// the plan is to be consumed.
///
/// This node is for serialization purposes only and can never be executed.
class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions {
public:
/// Create an instance from values
NamedTableNodeOptions(std::vector<std::string> names, std::shared_ptr<Schema> schema)
: names(std::move(names)), schema(std::move(schema)) {}
/// \brief the names to put in the serialized plan
std::vector<std::string> names;
/// \brief the output schema of the table
std::shared_ptr<Schema> schema;
};
/// \brief a source node which feeds data from a synchronous iterator of batches
///
/// ItMaker is a maker of an iterator of tabular data.
///
/// The node can be configured to use an I/O executor. If set then each time the
/// iterator is polled a new I/O thread task will be created to do the polling. This
/// allows a blocking iterator to stay off the CPU thread pool.
template <typename ItMaker>
class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
public:
/// Create an instance that will create a new task on io_executor for each iteration
SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
arrow::internal::Executor* io_executor)
: schema(std::move(schema)),
it_maker(std::move(it_maker)),
io_executor(io_executor),
requires_io(true) {}
/// Create an instance that will either iterate synchronously or use the default I/O
/// executor
SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
bool requires_io = false)
: schema(std::move(schema)),
it_maker(std::move(it_maker)),
io_executor(NULLPTR),
requires_io(requires_io) {}
/// \brief The schema of the record batches from the iterator
std::shared_ptr<Schema> schema;
/// \brief A maker of an iterator which acts as the data source
ItMaker it_maker;
/// \brief The executor to use for scanning the iterator
///
/// Defaults to the default I/O executor. Only used if requires_io is true.
/// If requires_io is false then this MUST be nullptr.
arrow::internal::Executor* io_executor;
/// \brief If true then items will be fetched from the iterator on a dedicated I/O
/// thread to keep I/O off the CPU thread
bool requires_io;
};
/// a source node that reads from a RecordBatchReader
///
/// Each iteration of the RecordBatchReader will be run on a new thread task created
/// on the I/O thread pool.
class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions {
public:
/// Create an instance from values
RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
arrow::internal::Executor* io_executor = NULLPTR)
: reader(std::move(reader)), io_executor(io_executor) {}
/// \brief The RecordBatchReader which acts as the data source
std::shared_ptr<RecordBatchReader> reader;
/// \brief The executor to use for the reader
///
/// Defaults to the default I/O executor.
arrow::internal::Executor* io_executor;
};
/// a source node that reads from an iterator of array vectors
using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
/// \brief An extended Source node which accepts a schema and array-vectors
class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions
: public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};
/// a source node that reads from an iterator of ExecBatch
using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
/// \brief An extended Source node which accepts a schema and exec-batches
class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions
: public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
public:
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
std::vector<ExecBatch> batches,
::arrow::internal::Executor* io_executor);
ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
std::vector<ExecBatch> batches, bool requires_io = false);
};
using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
/// a source node that reads from an iterator of RecordBatch
class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions
: public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};
/// \brief a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
/// this node. Any rows for which filter_expression does not evaluate to `true` will be
/// excluded in the batch emitted by this node.
///
/// This node will emit empty batches if all rows are excluded. This is done
/// to avoid gaps in the ordering.
class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions {
public:
/// \brief create an instance from values
explicit FilterNodeOptions(Expression filter_expression)
: filter_expression(std::move(filter_expression)) {}
/// \brief the expression to filter batches
///
/// The return type of this expression must be boolean
Expression filter_expression;
};
/// \brief a node which selects a specified subset from the input
class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions {
public:
static constexpr std::string_view kName = "fetch";
/// \brief create an instance from values
FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {}
/// \brief the number of rows to skip
int64_t offset;
/// \brief the number of rows to keep (not counting skipped rows)
int64_t count;
};
/// \brief a node which executes expressions on input batches, producing batches
/// of the same length with new columns.
///
/// Each expression will be evaluated against each batch which is pushed to
/// this node to produce a corresponding output column.
///
/// If names are not provided, the string representations of exprs will be used.
class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions {
public:
/// \brief create an instance from values
explicit ProjectNodeOptions(std::vector<Expression> expressions,
std::vector<std::string> names = {})
: expressions(std::move(expressions)), names(std::move(names)) {}
/// \brief the expressions to run on the batches
///
/// The output will have one column for each expression. If you wish to keep any of
/// the columns from the input then you should create a simple field_ref expression
/// for that column.
std::vector<Expression> expressions;
/// \brief the names of the output columns
///
/// If this is not specified then the result of calling ToString on the expression will
/// be used instead
///
/// This list should either be empty or have the same length as `expressions`
std::vector<std::string> names;
};
/// \brief a node which aggregates input batches and calculates summary statistics
///
/// The node can summarize the entire input or it can group the input with grouping keys
/// and segment keys.
///
/// By default, the aggregate node is a pipeline breaker. It must accumulate all input
/// before any output is produced. Segment keys are a performance optimization. If
/// you know your input is already partitioned by one or more columns then you can
/// specify these as segment keys. At each change in the segment keys the node will
/// emit values for all data seen so far.
///
/// Segment keys are currently limited to single-threaded mode.
///
/// Both keys and segment-keys determine the group. However segment-keys are also used
/// for determining grouping segments, which should be large, and allow streaming a
/// partial aggregation result after processing each segment. One common use-case for
/// segment-keys is ordered aggregation, in which the segment-key attribute specifies a
/// column with non-decreasing values or a lexicographically-ordered set of such columns.
///
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
/// then each aggregate is assumed to be a ScalarAggregate function.
///
/// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
/// described above, applies.
///
/// The keys and segment_keys vectors must be disjoint.
///
/// If no measures are provided then you will simply get the list of unique keys.
///
/// This node outputs segment keys first, followed by regular keys, followed by one
/// column for each aggregate.
class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
public:
/// \brief create an instance from values
explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
std::vector<FieldRef> keys = {},
std::vector<FieldRef> segment_keys = {})
: aggregates(std::move(aggregates)),
keys(std::move(keys)),
segment_keys(std::move(segment_keys)) {}
Loading ...