// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <atomic>
#include <cmath>
#include <functional>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>
#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/config.h"
#include "arrow/util/functional.h"
#include "arrow/util/macros.h"
#include "arrow/util/tracing.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"
namespace arrow {
template <typename>
struct EnsureFuture;
namespace detail {
template <typename>
struct is_future : std::false_type {};
template <typename T>
struct is_future<Future<T>> : std::true_type {};
template <typename Signature, typename Enable = void>
struct result_of;
template <typename Fn, typename... A>
struct result_of<Fn(A...),
internal::void_t<decltype(std::declval<Fn>()(std::declval<A>()...))>> {
using type = decltype(std::declval<Fn>()(std::declval<A>()...));
};
template <typename Signature>
using result_of_t = typename result_of<Signature>::type;
// Helper to find the synchronous counterpart for a Future
template <typename T>
struct SyncType {
using type = Result<T>;
};
template <>
struct SyncType<internal::Empty> {
using type = Status;
};
template <typename Fn>
using first_arg_is_status =
std::is_same<typename std::decay<internal::call_traits::argument_type<0, Fn>>::type,
Status>;
template <typename Fn, typename Then, typename Else,
typename Count = internal::call_traits::argument_count<Fn>>
using if_has_no_args = typename std::conditional<Count::value == 0, Then, Else>::type;
/// Creates a callback that can be added to a future to mark a `dest` future finished
template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty,
bool DestEmpty = Dest::is_empty>
struct MarkNextFinished {};
/// If the source and dest are both empty we can pass on the status
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, true, true> {
void operator()(const Status& status) && { next.MarkFinished(status); }
Dest next;
};
/// If the source is not empty but the dest is then we can take the
/// status out of the result
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, false, true> {
void operator()(const Result<typename Source::ValueType>& res) && {
next.MarkFinished(internal::Empty::ToResult(res.status()));
}
Dest next;
};
/// If neither are empty we pass on the result
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, false, false> {
void operator()(const Result<typename Source::ValueType>& res) && {
next.MarkFinished(res);
}
Dest next;
};
/// Helper that contains information about how to apply a continuation
struct ContinueFuture {
template <typename Return>
struct ForReturnImpl;
template <typename Return>
using ForReturn = typename ForReturnImpl<Return>::type;
template <typename Signature>
using ForSignature = ForReturn<result_of_t<Signature>>;
// If the callback returns void then we return Future<> that always finishes OK.
template <typename ContinueFunc, typename... Args,
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
typename NextFuture = ForReturn<ContinueResult>>
typename std::enable_if<std::is_void<ContinueResult>::value>::type operator()(
NextFuture next, ContinueFunc&& f, Args&&... a) const {
std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
next.MarkFinished();
}
/// If the callback returns a non-future then we return Future<T>
/// and mark the future finished with the callback result. It will get promoted
/// to Result<T> as part of MarkFinished if it isn't already.
///
/// If the callback returns Status and we return Future<> then also send the callback
/// result as-is to the destination future.
template <typename ContinueFunc, typename... Args,
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
typename NextFuture = ForReturn<ContinueResult>>
typename std::enable_if<
!std::is_void<ContinueResult>::value && !is_future<ContinueResult>::value &&
(!NextFuture::is_empty || std::is_same<ContinueResult, Status>::value)>::type
operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
}
/// If the callback returns a Result and the next future is Future<> then we mark
/// the future finished with the callback result.
///
/// It may seem odd that the next future is Future<> when the callback returns a
/// result but this can occur if the OnFailure callback returns a result while the
/// OnSuccess callback is void/Status (e.g. you would get this calling the one-arg
/// version of Then with an OnSuccess callback that returns void)
template <typename ContinueFunc, typename... Args,
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
typename NextFuture = ForReturn<ContinueResult>>
typename std::enable_if<!std::is_void<ContinueResult>::value &&
!is_future<ContinueResult>::value && NextFuture::is_empty &&
!std::is_same<ContinueResult, Status>::value>::type
operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());
}
/// If the callback returns a Future<T> then we return Future<T>. We create a new
/// future and add a callback to the future given to us by the user that forwards the
/// result to the future we just created
template <typename ContinueFunc, typename... Args,
typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
typename NextFuture = ForReturn<ContinueResult>>
typename std::enable_if<is_future<ContinueResult>::value>::type operator()(
NextFuture next, ContinueFunc&& f, Args&&... a) const {
ContinueResult signal_to_complete_next =
std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
MarkNextFinished<ContinueResult, NextFuture> callback{std::move(next)};
signal_to_complete_next.AddCallback(std::move(callback));
}
/// Helpers to conditionally ignore arguments to ContinueFunc
template <typename ContinueFunc, typename NextFuture, typename... Args>
void IgnoringArgsIf(std::true_type, NextFuture&& next, ContinueFunc&& f,
Args&&...) const {
operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f));
}
template <typename ContinueFunc, typename NextFuture, typename... Args>
void IgnoringArgsIf(std::false_type, NextFuture&& next, ContinueFunc&& f,
Args&&... a) const {
operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f),
std::forward<Args>(a)...);
}
};
/// Helper struct which tells us what kind of Future gets returned from `Then` based on
/// the return type of the OnSuccess callback
template <>
struct ContinueFuture::ForReturnImpl<void> {
using type = Future<>;
};
template <>
struct ContinueFuture::ForReturnImpl<Status> {
using type = Future<>;
};
template <typename R>
struct ContinueFuture::ForReturnImpl {
using type = Future<R>;
};
template <typename T>
struct ContinueFuture::ForReturnImpl<Result<T>> {
using type = Future<T>;
};
template <typename T>
struct ContinueFuture::ForReturnImpl<Future<T>> {
using type = Future<T>;
};
} // namespace detail
/// A Future's execution or completion status
enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };
inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }
/// \brief Describe whether the callback should be scheduled or run synchronously
enum class ShouldSchedule {
/// Always run the callback synchronously (the default)
Never = 0,
/// Schedule a new task only if the future is not finished when the
/// callback is added
IfUnfinished = 1,
/// Always schedule the callback as a new task
Always = 2,
/// Schedule a new task only if it would run on an executor other than
/// the specified executor.
IfDifferentExecutor = 3,
};
/// \brief Options that control how a continuation is run
struct CallbackOptions {
/// Describe whether the callback should be run synchronously or scheduled
ShouldSchedule should_schedule = ShouldSchedule::Never;
/// If the callback is scheduled then this is the executor it should be scheduled
/// on. If this is NULL then should_schedule must be Never
internal::Executor* executor = NULLPTR;
static CallbackOptions Defaults() { return {}; }
};
// Untyped private implementation
class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this<FutureImpl> {
public:
FutureImpl();
virtual ~FutureImpl() = default;
FutureState state() { return state_.load(); }
static std::unique_ptr<FutureImpl> Make();
static std::unique_ptr<FutureImpl> MakeFinished(FutureState state);
#ifdef ARROW_WITH_OPENTELEMETRY
void SetSpan(util::tracing::Span* span) { span_ = span; }
#endif
// Future API
void MarkFinished();
void MarkFailed();
void Wait();
bool Wait(double seconds);
template <typename ValueType>
Result<ValueType>* CastResult() const {
return static_cast<Result<ValueType>*>(result_.get());
}
using Callback = internal::FnOnce<void(const FutureImpl& impl)>;
void AddCallback(Callback callback, CallbackOptions opts);
bool TryAddCallback(const std::function<Callback()>& callback_factory,
CallbackOptions opts);
std::atomic<FutureState> state_{FutureState::PENDING};
// Type erased storage for arbitrary results
// XXX small objects could be stored inline instead of boxed in a pointer
using Storage = std::unique_ptr<void, void (*)(void*)>;
Storage result_{NULLPTR, NULLPTR};
struct CallbackRecord {
Callback callback;
CallbackOptions options;
};
std::vector<CallbackRecord> callbacks_;
#ifdef ARROW_WITH_OPENTELEMETRY
util::tracing::Span* span_ = NULLPTR;
#endif
};
// ---------------------------------------------------------------------
// Public API
/// \brief EXPERIMENTAL A std::future-like class with more functionality.
///
/// A Future represents the results of a past or future computation.
/// The Future API has two sides: a producer side and a consumer side.
///
/// The producer API allows creating a Future and setting its result or
/// status, possibly after running a computation function.
///
/// The consumer API allows querying a Future's current state, wait for it
/// to complete, and composing futures with callbacks.
template <typename T>
class [[nodiscard]] Future {
public:
using ValueType = T;
using SyncType = typename detail::SyncType<T>::type;
static constexpr bool is_empty = std::is_same<T, internal::Empty>::value;
// The default constructor creates an invalid Future. Use Future::Make()
// for a valid Future. This constructor is mostly for the convenience
// of being able to presize a vector of Futures.
Future() = default;
#ifdef ARROW_WITH_OPENTELEMETRY
void SetSpan(util::tracing::Span* span) { impl_->SetSpan(span); }
#endif
// Consumer API
bool is_valid() const { return impl_ != NULLPTR; }
/// \brief Return the Future's current state
///
/// A return value of PENDING is only indicative, as the Future can complete
/// concurrently. A return value of FAILURE or SUCCESS is definitive, though.
FutureState state() const {
CheckValid();
return impl_->state();
}
/// \brief Whether the Future is finished
///
/// A false return value is only indicative, as the Future can complete
/// concurrently. A true return value is definitive, though.
bool is_finished() const {
CheckValid();
return IsFutureFinished(impl_->state());
}
/// \brief Wait for the Future to complete and return its Result
const Result<ValueType>& result() const& {
Wait();
return *GetResult();
}
/// \brief Returns an rvalue to the result. This method is potentially unsafe
///
/// The future is not the unique owner of the result, copies of a future will
/// also point to the same result. You must make sure that no other copies
/// of the future exist. Attempts to add callbacks after you move the result
/// will result in undefined behavior.
Result<ValueType>&& MoveResult() {
Wait();
return std::move(*GetResult());
}
/// \brief Wait for the Future to complete and return its Status
const Status& status() const { return result().status(); }
/// \brief Future<T> is convertible to Future<>, which views only the
/// Status of the original. Marking the returned Future Finished is not supported.
explicit operator Future<>() const {
Future<> status_future;
status_future.impl_ = impl_;
return status_future;
}
/// \brief Wait for the Future to complete
void Wait() const {
CheckValid();
impl_->Wait();
}
/// \brief Wait for the Future to complete, or for the timeout to expire
///
/// `true` is returned if the Future completed, `false` if the timeout expired.
/// Note a `false` value is only indicative, as the Future can complete
/// concurrently.
bool Wait(double seconds) const {
CheckValid();
return impl_->Wait(seconds);
}
// Producer API
/// \brief Producer API: mark Future finished
///
/// The Future's result is set to `res`.
void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }
/// \brief Mark a Future<> completed with the provided Status.
template <typename E = ValueType, typename = typename std::enable_if<
std::is_same<E, internal::Empty>::value>::type>
void MarkFinished(Status s = Status::OK()) {
return DoMarkFinished(E::ToResult(std::move(s)));
}
/// \brief Producer API: instantiate a valid Future
///
/// The Future's state is initialized with PENDING. If you are creating a future with
/// this method you must ensure that future is eventually completed (with success or
/// failure). Creating a future, returning it, and never completing the future can lead
/// to memory leaks (for example, see Loop).
static Future Make() {
Future fut;
fut.impl_ = FutureImpl::Make();
return fut;
}
/// \brief Producer API: instantiate a finished Future
static Future<ValueType> MakeFinished(Result<ValueType> res) {
Future<ValueType> fut;
fut.InitializeFromResult(std::move(res));
return fut;
}
/// \brief Make a finished Future<> with the provided Status.
template <typename E = ValueType, typename = typename std::enable_if<
std::is_same<E, internal::Empty>::value>::type>
static Future<> MakeFinished(Status s = Status::OK()) {
return MakeFinished(E::ToResult(std::move(s)));
}
struct WrapResultOnComplete {
template <typename OnComplete>
struct Callback {
void operator()(const FutureImpl& impl) && {
std::move(on_complete)(*impl.CastResult<ValueType>());
}
OnComplete on_complete;
};
};
struct WrapStatusyOnComplete {
template <typename OnComplete>
struct Callback {
static_assert(std::is_same<internal::Empty, ValueType>::value,
"Only callbacks for Future<> should accept Status and not Result");
void operator()(const FutureImpl& impl) && {
std::move(on_complete)(impl.CastResult<ValueType>()->status());
}
OnComplete on_complete;
};
};
template <typename OnComplete>
using WrapOnComplete = typename std::conditional<
detail::first_arg_is_status<OnComplete>::value, WrapStatusyOnComplete,
WrapResultOnComplete>::type::template Callback<OnComplete>;
/// \brief Consumer API: Register a callback to run when this future completes
///
/// The callback should receive the result of the future (const Result<T>&)
/// For a void or statusy future this should be (const Status&)
///
/// There is no guarantee to the order in which callbacks will run. In
/// particular, callbacks added while the future is being marked complete
/// may be executed immediately, ahead of, or even the same time as, other
/// callbacks that have been previously added.
///
/// WARNING: callbacks may hold arbitrary references, including cyclic references.
/// Since callbacks will only be destroyed after they are invoked, this can lead to
/// memory leaks if a Future is never marked finished (abandoned):
///
/// {
/// auto fut = Future<>::Make();
/// fut.AddCallback([fut]() {});
/// }
///
/// In this example `fut` falls out of scope but is not destroyed because it holds a
/// cyclic reference to itself through the callback.
template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>>
void AddCallback(OnComplete on_complete,
CallbackOptions opts = CallbackOptions::Defaults()) const {
// We know impl_ will not be dangling when invoking callbacks because at least one
// thread will be waiting for MarkFinished to return. Thus it's safe to keep a
// weak reference to impl_ here
impl_->AddCallback(Callback{std::move(on_complete)}, opts);
}
/// \brief Overload of AddCallback that will return false instead of running
/// synchronously
///
/// This overload will guarantee the callback is never run synchronously. If the future
/// is already finished then it will simply return false. This can be useful to avoid
/// stack overflow in a situation where you have recursive Futures. For an example
/// see the Loop function
///
/// Takes in a callback factory function to allow moving callbacks (the factory function
/// will only be called if the callback can successfully be added)
///
/// Returns true if a callback was actually added and false if the callback failed
/// to add because the future was marked complete.
template <typename CallbackFactory,
typename OnComplete = detail::result_of_t<CallbackFactory()>,
typename Callback = WrapOnComplete<OnComplete>>
bool TryAddCallback(CallbackFactory callback_factory,
CallbackOptions opts = CallbackOptions::Defaults()) const {
return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts);
}
template <typename OnSuccess, typename OnFailure>
struct ThenOnComplete {
static constexpr bool has_no_args =
internal::call_traits::argument_count<OnSuccess>::value == 0;
using ContinuedFuture = detail::ContinueFuture::ForSignature<
detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
static_assert(
std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
ContinuedFuture>::value,
"OnSuccess and OnFailure must continue with the same future type");
struct DummyOnSuccess {
void operator()(const T&);
};
using OnSuccessArg = typename std::decay<internal::call_traits::argument_type<
0, detail::if_has_no_args<OnSuccess, DummyOnSuccess, OnSuccess>>>::type;
static_assert(
!std::is_same<OnSuccessArg, typename EnsureResult<OnSuccessArg>::type>::value,
"OnSuccess' argument should not be a Result");
void operator()(const Result<T>& result) && {
detail::ContinueFuture continue_future;
if (ARROW_PREDICT_TRUE(result.ok())) {
// move on_failure to a(n immediately destroyed) temporary to free its resources
ARROW_UNUSED(OnFailure(std::move(on_failure)));
continue_future.IgnoringArgsIf(
detail::if_has_no_args<OnSuccess, std::true_type, std::false_type>{},
std::move(next), std::move(on_success), result.ValueOrDie());
} else {
ARROW_UNUSED(OnSuccess(std::move(on_success)));
continue_future(std::move(next), std::move(on_failure), result.status());
}
}
OnSuccess on_success;
OnFailure on_failure;
ContinuedFuture next;
};
template <typename OnSuccess>
struct PassthruOnFailure {
using ContinuedFuture = detail::ContinueFuture::ForSignature<
detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;
Result<typename ContinuedFuture::ValueType> operator()(const Status& s) { return s; }
};
/// \brief Consumer API: Register a continuation to run when this future completes
///
/// The continuation will run in the same thread that called MarkFinished (whatever
/// callback is registered with this function will run before MarkFinished returns).
/// Avoid long-running callbacks in favor of submitting a task to an Executor and
/// returning the future.
///
/// Two callbacks are supported:
/// - OnSuccess, called with the result (const ValueType&) on successful completion.
/// for an empty future this will be called with nothing ()
/// - OnFailure, called with the error (const Status&) on failed completion.
/// This callback is optional and defaults to a passthru of any errors.
///
/// Then() returns a Future whose ValueType is derived from the return type of the
/// callbacks. If a callback returns:
/// - void, a Future<> will be returned which will completes successfully as soon
/// as the callback runs.
/// - Status, a Future<> will be returned which will complete with the returned Status
/// as soon as the callback runs.
/// - V or Result<V>, a Future<V> will be returned which will complete with the result
/// of invoking the callback as soon as the callback runs.
/// - Future<V>, a Future<V> will be returned which will be marked complete when the
/// future returned by the callback completes (and will complete with the same
/// result).
///
/// The continued Future type must be the same for both callbacks.
///
/// Note that OnFailure can swallow errors, allowing continued Futures to successfully
/// complete even if this Future fails.
///
/// If this future is already completed then the callback will be run immediately
/// and the returned future may already be marked complete.
///
/// See AddCallback for general considerations when writing callbacks.
template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
typename ContinuedFuture = typename OnComplete::ContinuedFuture>
ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
CallbackOptions options = CallbackOptions::Defaults()) const {
auto next = ContinuedFuture::Make();
AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
std::forward<OnFailure>(on_failure), next},
options);
return next;
}
/// \brief Implicit constructor to create a finished future from a value
Future(ValueType val) : Future() { // NOLINT runtime/explicit
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
SetResult(std::move(val));
}
/// \brief Implicit constructor to create a future from a Result, enabling use
/// of macros like ARROW_ASSIGN_OR_RAISE.
Future(Result<ValueType> res) : Future() { // NOLINT runtime/explicit
if (ARROW_PREDICT_TRUE(res.ok())) {
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
} else {
impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
}
SetResult(std::move(res));
}
/// \brief Implicit constructor to create a future from a Status, enabling use
/// of macros like ARROW_RETURN_NOT_OK.
Future(Status s) // NOLINT runtime/explicit
: Future(Result<ValueType>(std::move(s))) {}
protected:
void InitializeFromResult(Result<ValueType> res) {
if (ARROW_PREDICT_TRUE(res.ok())) {
impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
} else {
impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
}
SetResult(std::move(res));
}
void Initialize() { impl_ = FutureImpl::Make(); }
Result<ValueType>* GetResult() const { return impl_->CastResult<ValueType>(); }
void SetResult(Result<ValueType> res) {
impl_->result_ = {new Result<ValueType>(std::move(res)),
[](void* p) { delete static_cast<Result<ValueType>*>(p); }};
}
void DoMarkFinished(Result<ValueType> res) {
SetResult(std::move(res));
if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
impl_->MarkFinished();
} else {
impl_->MarkFailed();
}
}
void CheckValid() const {
#ifndef NDEBUG
if (!is_valid()) {
Status::Invalid("Invalid Future (default-initialized?)").Abort();
}
#endif
}
explicit Future(std::shared_ptr<FutureImpl> impl) : impl_(std::move(impl)) {}
std::shared_ptr<FutureImpl> impl_;
friend struct detail::ContinueFuture;
template <typename U>
friend class Future;
friend class WeakFuture<T>;
FRIEND_TEST(FutureRefTest, ChainRemoved);
FRIEND_TEST(FutureRefTest, TailRemoved);
FRIEND_TEST(FutureRefTest, HeadRemoved);
};
template <typename T>
typename Future<T>::SyncType FutureToSync(const Future<T>& fut) {
return fut.result();
}
template <>
inline typename Future<internal::Empty>::SyncType FutureToSync<internal::Empty>(
const Future<internal::Empty>& fut) {
return fut.status();
}
template <>
inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}
template <typename T>
class WeakFuture {
public:
explicit WeakFuture(const Future<T>& future) : impl_(future.impl_) {}
Future<T> get() { return Future<T>{impl_.lock()}; }
private:
std::weak_ptr<FutureImpl> impl_;
};
/// \defgroup future-utilities Functions for working with Futures
/// @{
/// If a Result<Future> holds an error instead of a Future, construct a finished Future
/// holding that error.
template <typename T>
static Future<T> DeferNotOk(Result<Future<T>> maybe_future) {
if (ARROW_PREDICT_FALSE(!maybe_future.ok())) {
return Future<T>::MakeFinished(std::move(maybe_future).status());
}
return std::move(maybe_future).MoveValueUnsafe();
}
/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future's result is a vector of the results of `futures`.
/// Note that this future will never be marked "failed"; failed results
/// will be stored in the result vector alongside successful results.
template <typename T>
Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
struct State {
explicit State(std::vector<Future<T>> f)
: futures(std::move(f)), n_remaining(futures.size()) {}
std::vector<Future<T>> futures;
std::atomic<size_t> n_remaining;
};
if (futures.size() == 0) {
return {std::vector<Result<T>>{}};
}
auto state = std::make_shared<State>(std::move(futures));
auto out = Future<std::vector<Result<T>>>::Make();
for (const Future<T>& future : state->futures) {
future.AddCallback([state, out](const Result<T>&) mutable {
if (state->n_remaining.fetch_sub(1) != 1) return;
std::vector<Result<T>> results(state->futures.size());
for (size_t i = 0; i < results.size(); ++i) {
results[i] = state->futures[i].result();
}
out.MarkFinished(std::move(results));
});
}
return out;
}
/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will be marked complete if all `futures` complete
/// successfully. Otherwise, it will be marked failed with the status of
/// the first failing future.
ARROW_EXPORT
Future<> AllComplete(const std::vector<Future<>>& futures);
/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will finish with an ok status if all `futures` finish with
/// an ok status. Otherwise, it will be marked failed with the status of
/// one of the failing futures.
///
/// Unlike AllComplete this Future will not complete immediately when a
/// failure occurs. It will wait until all futures have finished.
ARROW_EXPORT
Future<> AllFinished(const std::vector<Future<>>& futures);
/// @}
struct Continue {
template <typename T>
operator std::optional<T>() && { // NOLINT explicit
return {};
}
};
template <typename T = internal::Empty>
std::optional<T> Break(T break_value = {}) {
return std::optional<T>{std::move(break_value)};
}
template <typename T = internal::Empty>
using ControlFlow = std::optional<T>;
/// \brief Loop through an asynchronous sequence
///
/// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On completion
/// of each yielded future the resulting ControlFlow will be examined. A Break will
/// terminate the loop, while a Continue will re-invoke `iterate`.
///
/// \return A future which will complete when a Future returned by iterate completes with
/// a Break
template <typename Iterate,
typename Control = typename detail::result_of_t<Iterate()>::ValueType,
typename BreakValueType = typename Control::value_type>
Future<BreakValueType> Loop(Iterate iterate) {
struct Callback {
bool CheckForTermination(const Result<Control>& control_res) {
if (!control_res.ok()) {
break_fut.MarkFinished(control_res.status());
return true;
}
if (control_res->has_value()) {
break_fut.MarkFinished(**control_res);
return true;
}
return false;
}
void operator()(const Result<Control>& maybe_control) && {
if (CheckForTermination(maybe_control)) return;
auto control_fut = iterate();
while (true) {
if (control_fut.TryAddCallback([this]() { return *this; })) {
// Adding a callback succeeded; control_fut was not finished
// and we must wait to CheckForTermination.
return;
}
// Adding a callback failed; control_fut was finished and we
// can CheckForTermination immediately. This also avoids recursion and potential
// stack overflow.
if (CheckForTermination(control_fut.result())) return;
control_fut = iterate();
}
}
Iterate iterate;
// If the future returned by control_fut is never completed then we will be hanging on
// to break_fut forever even if the listener has given up listening on it. Instead we
// rely on the fact that a producer (the caller of Future<>::Make) is always
// responsible for completing the futures they create.
// TODO: Could avoid this kind of situation with "future abandonment" similar to mesos
Future<BreakValueType> break_fut;
};
auto break_fut = Future<BreakValueType>::Make();
auto control_fut = iterate();
control_fut.AddCallback(Callback{std::move(iterate), break_fut});
return break_fut;
}
inline Future<> ToFuture(Status status) {
return Future<>::MakeFinished(std::move(status));
}
template <typename T>
Future<T> ToFuture(T value) {
return Future<T>::MakeFinished(std::move(value));
}
template <typename T>
Future<T> ToFuture(Result<T> maybe_value) {
return Future<T>::MakeFinished(std::move(maybe_value));
}
template <typename T>
Future<T> ToFuture(Future<T> fut) {
return fut;
}
template <typename T>
struct EnsureFuture {
using type = decltype(ToFuture(std::declval<T>()));
};
} // namespace arrow