Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev70 

/ include / arrow / acero / options.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <vector>

#include "arrow/acero/type_fwd.h"
#include "arrow/acero/visibility.h"
#include "arrow/compute/api_aggregate.h"
#include "arrow/compute/api_vector.h"
#include "arrow/compute/exec.h"
#include "arrow/compute/expression.h"
#include "arrow/record_batch.h"
#include "arrow/result.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/async_util.h"

namespace arrow {

using compute::Aggregate;
using compute::ExecBatch;
using compute::Expression;
using compute::literal;
using compute::Ordering;
using compute::SelectKOptions;
using compute::SortOptions;

namespace internal {

class Executor;

}  // namespace internal

namespace acero {

/// \brief This must not be used in release-mode
struct DebugOptions;

using AsyncExecBatchGenerator = AsyncGenerator<std::optional<ExecBatch>>;

/// \addtogroup acero-nodes
/// @{

/// \brief A base class for all options objects
///
/// The only time this is used directly is when a node has no configuration
class ARROW_ACERO_EXPORT ExecNodeOptions {
 public:
  virtual ~ExecNodeOptions() = default;

  /// \brief This must not be used in release-mode
  std::shared_ptr<DebugOptions> debug_opts;
};

/// \brief A node representing a generic source of data for Acero
///
/// The source node will start calling `generator` during StartProducing.  An initial
/// task will be created that will call `generator`.  It will not call `generator`
/// reentrantly.  If the source can be read in parallel then those details should be
/// encapsulated within `generator`.
///
/// For each batch received a new task will be created to push that batch downstream.
/// This task will slice smaller units of size `ExecPlan::kMaxBatchSize` from the
/// parent batch and call InputReceived.  Thus, if the `generator` yields a large
/// batch it may result in several calls to InputReceived.
///
/// The SourceNode will, by default, assign an implicit ordering to outgoing batches.
/// This is valid as long as the generator generates batches in a deterministic fashion.
/// Currently, the only way to override this is to subclass the SourceNode.
///
/// This node is not generally used directly but can serve as the basis for various
/// specialized nodes.
class ARROW_ACERO_EXPORT SourceNodeOptions : public ExecNodeOptions {
 public:
  /// Create an instance from values
  SourceNodeOptions(std::shared_ptr<Schema> output_schema,
                    std::function<Future<std::optional<ExecBatch>>()> generator,
                    Ordering ordering = Ordering::Unordered())
      : output_schema(std::move(output_schema)),
        generator(std::move(generator)),
        ordering(std::move(ordering)) {}

  /// \brief the schema for batches that will be generated by this source
  std::shared_ptr<Schema> output_schema;
  /// \brief an asynchronous stream of batches ending with std::nullopt
  std::function<Future<std::optional<ExecBatch>>()> generator;

  Ordering ordering = Ordering::Unordered();
};

/// \brief a node that generates data from a table already loaded in memory
///
/// The table source node will slice off chunks, defined by `max_batch_size`
/// for parallel processing.  The table source node extends source node and so these
/// chunks will be iteratively processed in small batches.  \see SourceNodeOptions
/// for details.
class ARROW_ACERO_EXPORT TableSourceNodeOptions : public ExecNodeOptions {
 public:
  static constexpr int64_t kDefaultMaxBatchSize = 1 << 20;

  /// Create an instance from values
  TableSourceNodeOptions(std::shared_ptr<Table> table,
                         int64_t max_batch_size = kDefaultMaxBatchSize)
      : table(std::move(table)), max_batch_size(max_batch_size) {}

  /// \brief a table which acts as the data source
  std::shared_ptr<Table> table;
  /// \brief size of batches to emit from this node
  /// If the table is larger the node will emit multiple batches from the
  /// the table to be processed in parallel.
  int64_t max_batch_size;
};

/// \brief define a lazily resolved Arrow table.
///
/// The table uniquely identified by the names can typically be resolved at the time when
/// the plan is to be consumed.
///
/// This node is for serialization purposes only and can never be executed.
class ARROW_ACERO_EXPORT NamedTableNodeOptions : public ExecNodeOptions {
 public:
  /// Create an instance from values
  NamedTableNodeOptions(std::vector<std::string> names, std::shared_ptr<Schema> schema)
      : names(std::move(names)), schema(std::move(schema)) {}

  /// \brief the names to put in the serialized plan
  std::vector<std::string> names;
  /// \brief the output schema of the table
  std::shared_ptr<Schema> schema;
};

/// \brief a source node which feeds data from a synchronous iterator of batches
///
/// ItMaker is a maker of an iterator of tabular data.
///
/// The node can be configured to use an I/O executor.  If set then each time the
/// iterator is polled a new I/O thread task will be created to do the polling.  This
/// allows a blocking iterator to stay off the CPU thread pool.
template <typename ItMaker>
class ARROW_ACERO_EXPORT SchemaSourceNodeOptions : public ExecNodeOptions {
 public:
  /// Create an instance that will create a new task on io_executor for each iteration
  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
                          arrow::internal::Executor* io_executor)
      : schema(std::move(schema)),
        it_maker(std::move(it_maker)),
        io_executor(io_executor),
        requires_io(true) {}

  /// Create an instance that will either iterate synchronously or use the default I/O
  /// executor
  SchemaSourceNodeOptions(std::shared_ptr<Schema> schema, ItMaker it_maker,
                          bool requires_io = false)
      : schema(std::move(schema)),
        it_maker(std::move(it_maker)),
        io_executor(NULLPTR),
        requires_io(requires_io) {}

  /// \brief The schema of the record batches from the iterator
  std::shared_ptr<Schema> schema;

  /// \brief A maker of an iterator which acts as the data source
  ItMaker it_maker;

  /// \brief The executor to use for scanning the iterator
  ///
  /// Defaults to the default I/O executor.  Only used if requires_io is true.
  /// If requires_io is false then this MUST be nullptr.
  arrow::internal::Executor* io_executor;

  /// \brief If true then items will be fetched from the iterator on a dedicated I/O
  ///        thread to keep I/O off the CPU thread
  bool requires_io;
};

/// a source node that reads from a RecordBatchReader
///
/// Each iteration of the RecordBatchReader will be run on a new thread task created
/// on the I/O thread pool.
class ARROW_ACERO_EXPORT RecordBatchReaderSourceNodeOptions : public ExecNodeOptions {
 public:
  /// Create an instance from values
  RecordBatchReaderSourceNodeOptions(std::shared_ptr<RecordBatchReader> reader,
                                     arrow::internal::Executor* io_executor = NULLPTR)
      : reader(std::move(reader)), io_executor(io_executor) {}

  /// \brief The RecordBatchReader which acts as the data source
  std::shared_ptr<RecordBatchReader> reader;

  /// \brief The executor to use for the reader
  ///
  /// Defaults to the default I/O executor.
  arrow::internal::Executor* io_executor;
};

/// a source node that reads from an iterator of array vectors
using ArrayVectorIteratorMaker = std::function<Iterator<std::shared_ptr<ArrayVector>>()>;
/// \brief An extended Source node which accepts a schema and array-vectors
class ARROW_ACERO_EXPORT ArrayVectorSourceNodeOptions
    : public SchemaSourceNodeOptions<ArrayVectorIteratorMaker> {
  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};

/// a source node that reads from an iterator of ExecBatch
using ExecBatchIteratorMaker = std::function<Iterator<std::shared_ptr<ExecBatch>>()>;
/// \brief An extended Source node which accepts a schema and exec-batches
class ARROW_ACERO_EXPORT ExecBatchSourceNodeOptions
    : public SchemaSourceNodeOptions<ExecBatchIteratorMaker> {
 public:
  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
  ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
                             std::vector<ExecBatch> batches,
                             ::arrow::internal::Executor* io_executor);
  ExecBatchSourceNodeOptions(std::shared_ptr<Schema> schema,
                             std::vector<ExecBatch> batches, bool requires_io = false);
};

using RecordBatchIteratorMaker = std::function<Iterator<std::shared_ptr<RecordBatch>>()>;
/// a source node that reads from an iterator of RecordBatch
class ARROW_ACERO_EXPORT RecordBatchSourceNodeOptions
    : public SchemaSourceNodeOptions<RecordBatchIteratorMaker> {
  using SchemaSourceNodeOptions::SchemaSourceNodeOptions;
};

/// \brief a node which excludes some rows from batches passed through it
///
/// filter_expression will be evaluated against each batch which is pushed to
/// this node. Any rows for which filter_expression does not evaluate to `true` will be
/// excluded in the batch emitted by this node.
///
/// This node will emit empty batches if all rows are excluded.  This is done
/// to avoid gaps in the ordering.
class ARROW_ACERO_EXPORT FilterNodeOptions : public ExecNodeOptions {
 public:
  /// \brief create an instance from values
  explicit FilterNodeOptions(Expression filter_expression)
      : filter_expression(std::move(filter_expression)) {}

  /// \brief the expression to filter batches
  ///
  /// The return type of this expression must be boolean
  Expression filter_expression;
};

/// \brief a node which selects a specified subset from the input
class ARROW_ACERO_EXPORT FetchNodeOptions : public ExecNodeOptions {
 public:
  static constexpr std::string_view kName = "fetch";
  /// \brief create an instance from values
  FetchNodeOptions(int64_t offset, int64_t count) : offset(offset), count(count) {}
  /// \brief the number of rows to skip
  int64_t offset;
  /// \brief the number of rows to keep (not counting skipped rows)
  int64_t count;
};

/// \brief a node which executes expressions on input batches, producing batches
/// of the same length with new columns.
///
/// Each expression will be evaluated against each batch which is pushed to
/// this node to produce a corresponding output column.
///
/// If names are not provided, the string representations of exprs will be used.
class ARROW_ACERO_EXPORT ProjectNodeOptions : public ExecNodeOptions {
 public:
  /// \brief create an instance from values
  explicit ProjectNodeOptions(std::vector<Expression> expressions,
                              std::vector<std::string> names = {})
      : expressions(std::move(expressions)), names(std::move(names)) {}

  /// \brief the expressions to run on the batches
  ///
  /// The output will have one column for each expression.  If you wish to keep any of
  /// the columns from the input then you should create a simple field_ref expression
  /// for that column.
  std::vector<Expression> expressions;
  /// \brief the names of the output columns
  ///
  /// If this is not specified then the result of calling ToString on the expression will
  /// be used instead
  ///
  /// This list should either be empty or have the same length as `expressions`
  std::vector<std::string> names;
};

/// \brief a node which aggregates input batches and calculates summary statistics
///
/// The node can summarize the entire input or it can group the input with grouping keys
/// and segment keys.
///
/// By default, the aggregate node is a pipeline breaker.  It must accumulate all input
/// before any output is produced.  Segment keys are a performance optimization.  If
/// you know your input is already partitioned by one or more columns then you can
/// specify these as segment keys.  At each change in the segment keys the node will
/// emit values for all data seen so far.
///
/// Segment keys are currently limited to single-threaded mode.
///
/// Both keys and segment-keys determine the group.  However segment-keys are also used
/// for determining grouping segments, which should be large, and allow streaming a
/// partial aggregation result after processing each segment.  One common use-case for
/// segment-keys is ordered aggregation, in which the segment-key attribute specifies a
/// column with non-decreasing values or a lexicographically-ordered set of such columns.
///
/// If the keys attribute is a non-empty vector, then each aggregate in `aggregates` is
/// expected to be a HashAggregate function. If the keys attribute is an empty vector,
/// then each aggregate is assumed to be a ScalarAggregate function.
///
/// If the segment_keys attribute is a non-empty vector, then segmented aggregation, as
/// described above, applies.
///
/// The keys and segment_keys vectors must be disjoint.
///
/// If no measures are provided then you will simply get the list of unique keys.
///
/// This node outputs segment keys first, followed by regular keys, followed by one
/// column for each aggregate.
class ARROW_ACERO_EXPORT AggregateNodeOptions : public ExecNodeOptions {
 public:
  /// \brief create an instance from values
  explicit AggregateNodeOptions(std::vector<Aggregate> aggregates,
                                std::vector<FieldRef> keys = {},
                                std::vector<FieldRef> segment_keys = {})
      : aggregates(std::move(aggregates)),
        keys(std::move(keys)),
        segment_keys(std::move(segment_keys)) {}
Loading ...