Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

arrow-nightlies / pyarrow   python

Repository URL to install this package:

Version: 19.0.0.dev259 

/ include / arrow / util / future.h

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <atomic>
#include <cmath>
#include <functional>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
#include <vector>

#include "arrow/result.h"
#include "arrow/status.h"
#include "arrow/type_fwd.h"
#include "arrow/type_traits.h"
#include "arrow/util/config.h"
#include "arrow/util/functional.h"
#include "arrow/util/macros.h"
#include "arrow/util/tracing.h"
#include "arrow/util/type_fwd.h"
#include "arrow/util/visibility.h"

namespace arrow {

template <typename>
struct EnsureFuture;

namespace detail {

template <typename>
struct is_future : std::false_type {};

template <typename T>
struct is_future<Future<T>> : std::true_type {};

template <typename Signature, typename Enable = void>
struct result_of;

template <typename Fn, typename... A>
struct result_of<Fn(A...),
                 internal::void_t<decltype(std::declval<Fn>()(std::declval<A>()...))>> {
  using type = decltype(std::declval<Fn>()(std::declval<A>()...));
};

template <typename Signature>
using result_of_t = typename result_of<Signature>::type;

// Helper to find the synchronous counterpart for a Future
template <typename T>
struct SyncType {
  using type = Result<T>;
};

template <>
struct SyncType<internal::Empty> {
  using type = Status;
};

template <typename Fn>
using first_arg_is_status =
    std::is_same<typename std::decay<internal::call_traits::argument_type<0, Fn>>::type,
                 Status>;

template <typename Fn, typename Then, typename Else,
          typename Count = internal::call_traits::argument_count<Fn>>
using if_has_no_args = typename std::conditional<Count::value == 0, Then, Else>::type;

/// Creates a callback that can be added to a future to mark a `dest` future finished
template <typename Source, typename Dest, bool SourceEmpty = Source::is_empty,
          bool DestEmpty = Dest::is_empty>
struct MarkNextFinished {};

/// If the source and dest are both empty we can pass on the status
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, true, true> {
  void operator()(const Status& status) && { next.MarkFinished(status); }
  Dest next;
};

/// If the source is not empty but the dest is then we can take the
/// status out of the result
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, false, true> {
  void operator()(const Result<typename Source::ValueType>& res) && {
    next.MarkFinished(internal::Empty::ToResult(res.status()));
  }
  Dest next;
};

/// If neither are empty we pass on the result
template <typename Source, typename Dest>
struct MarkNextFinished<Source, Dest, false, false> {
  void operator()(const Result<typename Source::ValueType>& res) && {
    next.MarkFinished(res);
  }
  Dest next;
};

/// Helper that contains information about how to apply a continuation
struct ContinueFuture {
  template <typename Return>
  struct ForReturnImpl;

  template <typename Return>
  using ForReturn = typename ForReturnImpl<Return>::type;

  template <typename Signature>
  using ForSignature = ForReturn<result_of_t<Signature>>;

  // If the callback returns void then we return Future<> that always finishes OK.
  template <typename ContinueFunc, typename... Args,
            typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
            typename NextFuture = ForReturn<ContinueResult>>
  typename std::enable_if<std::is_void<ContinueResult>::value>::type operator()(
      NextFuture next, ContinueFunc&& f, Args&&... a) const {
    std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
    next.MarkFinished();
  }

  /// If the callback returns a non-future then we return Future<T>
  /// and mark the future finished with the callback result.  It will get promoted
  /// to Result<T> as part of MarkFinished if it isn't already.
  ///
  /// If the callback returns Status and we return Future<> then also send the callback
  /// result as-is to the destination future.
  template <typename ContinueFunc, typename... Args,
            typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
            typename NextFuture = ForReturn<ContinueResult>>
  typename std::enable_if<
      !std::is_void<ContinueResult>::value && !is_future<ContinueResult>::value &&
      (!NextFuture::is_empty || std::is_same<ContinueResult, Status>::value)>::type
  operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
    next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...));
  }

  /// If the callback returns a Result and the next future is Future<> then we mark
  /// the future finished with the callback result.
  ///
  /// It may seem odd that the next future is Future<> when the callback returns a
  /// result but this can occur if the OnFailure callback returns a result while the
  /// OnSuccess callback is void/Status (e.g. you would get this calling the one-arg
  /// version of Then with an OnSuccess callback that returns void)
  template <typename ContinueFunc, typename... Args,
            typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
            typename NextFuture = ForReturn<ContinueResult>>
  typename std::enable_if<!std::is_void<ContinueResult>::value &&
                          !is_future<ContinueResult>::value && NextFuture::is_empty &&
                          !std::is_same<ContinueResult, Status>::value>::type
  operator()(NextFuture next, ContinueFunc&& f, Args&&... a) const {
    next.MarkFinished(std::forward<ContinueFunc>(f)(std::forward<Args>(a)...).status());
  }

  /// If the callback returns a Future<T> then we return Future<T>.  We create a new
  /// future and add a callback to the future given to us by the user that forwards the
  /// result to the future we just created
  template <typename ContinueFunc, typename... Args,
            typename ContinueResult = result_of_t<ContinueFunc && (Args && ...)>,
            typename NextFuture = ForReturn<ContinueResult>>
  typename std::enable_if<is_future<ContinueResult>::value>::type operator()(
      NextFuture next, ContinueFunc&& f, Args&&... a) const {
    ContinueResult signal_to_complete_next =
        std::forward<ContinueFunc>(f)(std::forward<Args>(a)...);
    MarkNextFinished<ContinueResult, NextFuture> callback{std::move(next)};
    signal_to_complete_next.AddCallback(std::move(callback));
  }

  /// Helpers to conditionally ignore arguments to ContinueFunc
  template <typename ContinueFunc, typename NextFuture, typename... Args>
  void IgnoringArgsIf(std::true_type, NextFuture&& next, ContinueFunc&& f,
                      Args&&...) const {
    operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f));
  }
  template <typename ContinueFunc, typename NextFuture, typename... Args>
  void IgnoringArgsIf(std::false_type, NextFuture&& next, ContinueFunc&& f,
                      Args&&... a) const {
    operator()(std::forward<NextFuture>(next), std::forward<ContinueFunc>(f),
               std::forward<Args>(a)...);
  }
};

/// Helper struct which tells us what kind of Future gets returned from `Then` based on
/// the return type of the OnSuccess callback
template <>
struct ContinueFuture::ForReturnImpl<void> {
  using type = Future<>;
};

template <>
struct ContinueFuture::ForReturnImpl<Status> {
  using type = Future<>;
};

template <typename R>
struct ContinueFuture::ForReturnImpl {
  using type = Future<R>;
};

template <typename T>
struct ContinueFuture::ForReturnImpl<Result<T>> {
  using type = Future<T>;
};

template <typename T>
struct ContinueFuture::ForReturnImpl<Future<T>> {
  using type = Future<T>;
};

}  // namespace detail

/// A Future's execution or completion status
enum class FutureState : int8_t { PENDING, SUCCESS, FAILURE };

inline bool IsFutureFinished(FutureState state) { return state != FutureState::PENDING; }

/// \brief Describe whether the callback should be scheduled or run synchronously
enum class ShouldSchedule {
  /// Always run the callback synchronously (the default)
  Never = 0,
  /// Schedule a new task only if the future is not finished when the
  /// callback is added
  IfUnfinished = 1,
  /// Always schedule the callback as a new task
  Always = 2,
  /// Schedule a new task only if it would run on an executor other than
  /// the specified executor.
  IfDifferentExecutor = 3,
};

/// \brief Options that control how a continuation is run
struct CallbackOptions {
  /// Describe whether the callback should be run synchronously or scheduled
  ShouldSchedule should_schedule = ShouldSchedule::Never;
  /// If the callback is scheduled then this is the executor it should be scheduled
  /// on.  If this is NULL then should_schedule must be Never
  internal::Executor* executor = NULLPTR;

  static CallbackOptions Defaults() { return {}; }
};

// Untyped private implementation
class ARROW_EXPORT FutureImpl : public std::enable_shared_from_this<FutureImpl> {
 public:
  FutureImpl();
  virtual ~FutureImpl() = default;

  FutureState state() { return state_.load(); }

  static std::unique_ptr<FutureImpl> Make();
  static std::unique_ptr<FutureImpl> MakeFinished(FutureState state);

#ifdef ARROW_WITH_OPENTELEMETRY
  void SetSpan(util::tracing::Span* span) { span_ = span; }
#endif

  // Future API
  void MarkFinished();
  void MarkFailed();
  void Wait();
  bool Wait(double seconds);
  template <typename ValueType>
  Result<ValueType>* CastResult() const {
    return static_cast<Result<ValueType>*>(result_.get());
  }

  using Callback = internal::FnOnce<void(const FutureImpl& impl)>;
  void AddCallback(Callback callback, CallbackOptions opts);
  bool TryAddCallback(const std::function<Callback()>& callback_factory,
                      CallbackOptions opts);

  std::atomic<FutureState> state_{FutureState::PENDING};

  // Type erased storage for arbitrary results
  // XXX small objects could be stored inline instead of boxed in a pointer
  using Storage = std::unique_ptr<void, void (*)(void*)>;
  Storage result_{NULLPTR, NULLPTR};

  struct CallbackRecord {
    Callback callback;
    CallbackOptions options;
  };
  std::vector<CallbackRecord> callbacks_;
#ifdef ARROW_WITH_OPENTELEMETRY
  util::tracing::Span* span_ = NULLPTR;
#endif
};

// ---------------------------------------------------------------------
// Public API

/// \brief EXPERIMENTAL A std::future-like class with more functionality.
///
/// A Future represents the results of a past or future computation.
/// The Future API has two sides: a producer side and a consumer side.
///
/// The producer API allows creating a Future and setting its result or
/// status, possibly after running a computation function.
///
/// The consumer API allows querying a Future's current state, wait for it
/// to complete, and composing futures with callbacks.
template <typename T>
class [[nodiscard]] Future {
 public:
  using ValueType = T;
  using SyncType = typename detail::SyncType<T>::type;
  static constexpr bool is_empty = std::is_same<T, internal::Empty>::value;
  // The default constructor creates an invalid Future.  Use Future::Make()
  // for a valid Future.  This constructor is mostly for the convenience
  // of being able to presize a vector of Futures.
  Future() = default;

#ifdef ARROW_WITH_OPENTELEMETRY
  void SetSpan(util::tracing::Span* span) { impl_->SetSpan(span); }
#endif

  // Consumer API

  bool is_valid() const { return impl_ != NULLPTR; }

  /// \brief Return the Future's current state
  ///
  /// A return value of PENDING is only indicative, as the Future can complete
  /// concurrently.  A return value of FAILURE or SUCCESS is definitive, though.
  FutureState state() const {
    CheckValid();
    return impl_->state();
  }

  /// \brief Whether the Future is finished
  ///
  /// A false return value is only indicative, as the Future can complete
  /// concurrently.  A true return value is definitive, though.
  bool is_finished() const {
    CheckValid();
    return IsFutureFinished(impl_->state());
  }

  /// \brief Wait for the Future to complete and return its Result
  const Result<ValueType>& result() const& {
    Wait();
    return *GetResult();
  }

  /// \brief Returns an rvalue to the result.  This method is potentially unsafe
  ///
  /// The future is not the unique owner of the result, copies of a future will
  /// also point to the same result.  You must make sure that no other copies
  /// of the future exist.  Attempts to add callbacks after you move the result
  /// will result in undefined behavior.
  Result<ValueType>&& MoveResult() {
    Wait();
    return std::move(*GetResult());
  }

  /// \brief Wait for the Future to complete and return its Status
  const Status& status() const { return result().status(); }

  /// \brief Future<T> is convertible to Future<>, which views only the
  /// Status of the original. Marking the returned Future Finished is not supported.
  explicit operator Future<>() const {
    Future<> status_future;
    status_future.impl_ = impl_;
    return status_future;
  }

  /// \brief Wait for the Future to complete
  void Wait() const {
    CheckValid();
    impl_->Wait();
  }

  /// \brief Wait for the Future to complete, or for the timeout to expire
  ///
  /// `true` is returned if the Future completed, `false` if the timeout expired.
  /// Note a `false` value is only indicative, as the Future can complete
  /// concurrently.
  bool Wait(double seconds) const {
    CheckValid();
    return impl_->Wait(seconds);
  }

  // Producer API

  /// \brief Producer API: mark Future finished
  ///
  /// The Future's result is set to `res`.
  void MarkFinished(Result<ValueType> res) { DoMarkFinished(std::move(res)); }

  /// \brief Mark a Future<> completed with the provided Status.
  template <typename E = ValueType, typename = typename std::enable_if<
                                        std::is_same<E, internal::Empty>::value>::type>
  void MarkFinished(Status s = Status::OK()) {
    return DoMarkFinished(E::ToResult(std::move(s)));
  }

  /// \brief Producer API: instantiate a valid Future
  ///
  /// The Future's state is initialized with PENDING.  If you are creating a future with
  /// this method you must ensure that future is eventually completed (with success or
  /// failure).  Creating a future, returning it, and never completing the future can lead
  /// to memory leaks (for example, see Loop).
  static Future Make() {
    Future fut;
    fut.impl_ = FutureImpl::Make();
    return fut;
  }

  /// \brief Producer API: instantiate a finished Future
  static Future<ValueType> MakeFinished(Result<ValueType> res) {
    Future<ValueType> fut;
    fut.InitializeFromResult(std::move(res));
    return fut;
  }

  /// \brief Make a finished Future<> with the provided Status.
  template <typename E = ValueType, typename = typename std::enable_if<
                                        std::is_same<E, internal::Empty>::value>::type>
  static Future<> MakeFinished(Status s = Status::OK()) {
    return MakeFinished(E::ToResult(std::move(s)));
  }

  struct WrapResultOnComplete {
    template <typename OnComplete>
    struct Callback {
      void operator()(const FutureImpl& impl) && {
        std::move(on_complete)(*impl.CastResult<ValueType>());
      }
      OnComplete on_complete;
    };
  };

  struct WrapStatusyOnComplete {
    template <typename OnComplete>
    struct Callback {
      static_assert(std::is_same<internal::Empty, ValueType>::value,
                    "Only callbacks for Future<> should accept Status and not Result");

      void operator()(const FutureImpl& impl) && {
        std::move(on_complete)(impl.CastResult<ValueType>()->status());
      }
      OnComplete on_complete;
    };
  };

  template <typename OnComplete>
  using WrapOnComplete = typename std::conditional<
      detail::first_arg_is_status<OnComplete>::value, WrapStatusyOnComplete,
      WrapResultOnComplete>::type::template Callback<OnComplete>;

  /// \brief Consumer API: Register a callback to run when this future completes
  ///
  /// The callback should receive the result of the future (const Result<T>&)
  /// For a void or statusy future this should be (const Status&)
  ///
  /// There is no guarantee to the order in which callbacks will run.  In
  /// particular, callbacks added while the future is being marked complete
  /// may be executed immediately, ahead of, or even the same time as, other
  /// callbacks that have been previously added.
  ///
  /// WARNING: callbacks may hold arbitrary references, including cyclic references.
  /// Since callbacks will only be destroyed after they are invoked, this can lead to
  /// memory leaks if a Future is never marked finished (abandoned):
  ///
  /// {
  ///     auto fut = Future<>::Make();
  ///     fut.AddCallback([fut]() {});
  /// }
  ///
  /// In this example `fut` falls out of scope but is not destroyed because it holds a
  /// cyclic reference to itself through the callback.
  template <typename OnComplete, typename Callback = WrapOnComplete<OnComplete>>
  void AddCallback(OnComplete on_complete,
                   CallbackOptions opts = CallbackOptions::Defaults()) const {
    // We know impl_ will not be dangling when invoking callbacks because at least one
    // thread will be waiting for MarkFinished to return. Thus it's safe to keep a
    // weak reference to impl_ here
    impl_->AddCallback(Callback{std::move(on_complete)}, opts);
  }

  /// \brief Overload of AddCallback that will return false instead of running
  /// synchronously
  ///
  /// This overload will guarantee the callback is never run synchronously.  If the future
  /// is already finished then it will simply return false.  This can be useful to avoid
  /// stack overflow in a situation where you have recursive Futures.  For an example
  /// see the Loop function
  ///
  /// Takes in a callback factory function to allow moving callbacks (the factory function
  /// will only be called if the callback can successfully be added)
  ///
  /// Returns true if a callback was actually added and false if the callback failed
  /// to add because the future was marked complete.
  template <typename CallbackFactory,
            typename OnComplete = detail::result_of_t<CallbackFactory()>,
            typename Callback = WrapOnComplete<OnComplete>>
  bool TryAddCallback(CallbackFactory callback_factory,
                      CallbackOptions opts = CallbackOptions::Defaults()) const {
    return impl_->TryAddCallback([&]() { return Callback{callback_factory()}; }, opts);
  }

  template <typename OnSuccess, typename OnFailure>
  struct ThenOnComplete {
    static constexpr bool has_no_args =
        internal::call_traits::argument_count<OnSuccess>::value == 0;

    using ContinuedFuture = detail::ContinueFuture::ForSignature<
        detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;

    static_assert(
        std::is_same<detail::ContinueFuture::ForSignature<OnFailure && (const Status&)>,
                     ContinuedFuture>::value,
        "OnSuccess and OnFailure must continue with the same future type");

    struct DummyOnSuccess {
      void operator()(const T&);
    };
    using OnSuccessArg = typename std::decay<internal::call_traits::argument_type<
        0, detail::if_has_no_args<OnSuccess, DummyOnSuccess, OnSuccess>>>::type;

    static_assert(
        !std::is_same<OnSuccessArg, typename EnsureResult<OnSuccessArg>::type>::value,
        "OnSuccess' argument should not be a Result");

    void operator()(const Result<T>& result) && {
      detail::ContinueFuture continue_future;
      if (ARROW_PREDICT_TRUE(result.ok())) {
        // move on_failure to a(n immediately destroyed) temporary to free its resources
        ARROW_UNUSED(OnFailure(std::move(on_failure)));
        continue_future.IgnoringArgsIf(
            detail::if_has_no_args<OnSuccess, std::true_type, std::false_type>{},
            std::move(next), std::move(on_success), result.ValueOrDie());
      } else {
        ARROW_UNUSED(OnSuccess(std::move(on_success)));
        continue_future(std::move(next), std::move(on_failure), result.status());
      }
    }

    OnSuccess on_success;
    OnFailure on_failure;
    ContinuedFuture next;
  };

  template <typename OnSuccess>
  struct PassthruOnFailure {
    using ContinuedFuture = detail::ContinueFuture::ForSignature<
        detail::if_has_no_args<OnSuccess, OnSuccess && (), OnSuccess && (const T&)>>;

    Result<typename ContinuedFuture::ValueType> operator()(const Status& s) { return s; }
  };

  /// \brief Consumer API: Register a continuation to run when this future completes
  ///
  /// The continuation will run in the same thread that called MarkFinished (whatever
  /// callback is registered with this function will run before MarkFinished returns).
  /// Avoid long-running callbacks in favor of submitting a task to an Executor and
  /// returning the future.
  ///
  /// Two callbacks are supported:
  /// - OnSuccess, called with the result (const ValueType&) on successful completion.
  ///              for an empty future this will be called with nothing ()
  /// - OnFailure, called with the error (const Status&) on failed completion.
  ///              This callback is optional and defaults to a passthru of any errors.
  ///
  /// Then() returns a Future whose ValueType is derived from the return type of the
  /// callbacks. If a callback returns:
  /// - void, a Future<> will be returned which will completes successfully as soon
  ///   as the callback runs.
  /// - Status, a Future<> will be returned which will complete with the returned Status
  ///   as soon as the callback runs.
  /// - V or Result<V>, a Future<V> will be returned which will complete with the result
  ///   of invoking the callback as soon as the callback runs.
  /// - Future<V>, a Future<V> will be returned which will be marked complete when the
  ///   future returned by the callback completes (and will complete with the same
  ///   result).
  ///
  /// The continued Future type must be the same for both callbacks.
  ///
  /// Note that OnFailure can swallow errors, allowing continued Futures to successfully
  /// complete even if this Future fails.
  ///
  /// If this future is already completed then the callback will be run immediately
  /// and the returned future may already be marked complete.
  ///
  /// See AddCallback for general considerations when writing callbacks.
  template <typename OnSuccess, typename OnFailure = PassthruOnFailure<OnSuccess>,
            typename OnComplete = ThenOnComplete<OnSuccess, OnFailure>,
            typename ContinuedFuture = typename OnComplete::ContinuedFuture>
  ContinuedFuture Then(OnSuccess on_success, OnFailure on_failure = {},
                       CallbackOptions options = CallbackOptions::Defaults()) const {
    auto next = ContinuedFuture::Make();
    AddCallback(OnComplete{std::forward<OnSuccess>(on_success),
                           std::forward<OnFailure>(on_failure), next},
                options);
    return next;
  }

  /// \brief Implicit constructor to create a finished future from a value
  Future(ValueType val) : Future() {  // NOLINT runtime/explicit
    impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
    SetResult(std::move(val));
  }

  /// \brief Implicit constructor to create a future from a Result, enabling use
  ///     of macros like ARROW_ASSIGN_OR_RAISE.
  Future(Result<ValueType> res) : Future() {  // NOLINT runtime/explicit
    if (ARROW_PREDICT_TRUE(res.ok())) {
      impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
    } else {
      impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
    }
    SetResult(std::move(res));
  }

  /// \brief Implicit constructor to create a future from a Status, enabling use
  ///     of macros like ARROW_RETURN_NOT_OK.
  Future(Status s)  // NOLINT runtime/explicit
      : Future(Result<ValueType>(std::move(s))) {}

 protected:
  void InitializeFromResult(Result<ValueType> res) {
    if (ARROW_PREDICT_TRUE(res.ok())) {
      impl_ = FutureImpl::MakeFinished(FutureState::SUCCESS);
    } else {
      impl_ = FutureImpl::MakeFinished(FutureState::FAILURE);
    }
    SetResult(std::move(res));
  }

  void Initialize() { impl_ = FutureImpl::Make(); }

  Result<ValueType>* GetResult() const { return impl_->CastResult<ValueType>(); }

  void SetResult(Result<ValueType> res) {
    impl_->result_ = {new Result<ValueType>(std::move(res)),
                      [](void* p) { delete static_cast<Result<ValueType>*>(p); }};
  }

  void DoMarkFinished(Result<ValueType> res) {
    SetResult(std::move(res));

    if (ARROW_PREDICT_TRUE(GetResult()->ok())) {
      impl_->MarkFinished();
    } else {
      impl_->MarkFailed();
    }
  }

  void CheckValid() const {
#ifndef NDEBUG
    if (!is_valid()) {
      Status::Invalid("Invalid Future (default-initialized?)").Abort();
    }
#endif
  }

  explicit Future(std::shared_ptr<FutureImpl> impl) : impl_(std::move(impl)) {}

  std::shared_ptr<FutureImpl> impl_;

  friend struct detail::ContinueFuture;

  template <typename U>
  friend class Future;
  friend class WeakFuture<T>;

  FRIEND_TEST(FutureRefTest, ChainRemoved);
  FRIEND_TEST(FutureRefTest, TailRemoved);
  FRIEND_TEST(FutureRefTest, HeadRemoved);
};

template <typename T>
typename Future<T>::SyncType FutureToSync(const Future<T>& fut) {
  return fut.result();
}

template <>
inline typename Future<internal::Empty>::SyncType FutureToSync<internal::Empty>(
    const Future<internal::Empty>& fut) {
  return fut.status();
}

template <>
inline Future<>::Future(Status s) : Future(internal::Empty::ToResult(std::move(s))) {}

template <typename T>
class WeakFuture {
 public:
  explicit WeakFuture(const Future<T>& future) : impl_(future.impl_) {}

  Future<T> get() { return Future<T>{impl_.lock()}; }

 private:
  std::weak_ptr<FutureImpl> impl_;
};

/// \defgroup future-utilities Functions for working with Futures
/// @{

/// If a Result<Future> holds an error instead of a Future, construct a finished Future
/// holding that error.
template <typename T>
static Future<T> DeferNotOk(Result<Future<T>> maybe_future) {
  if (ARROW_PREDICT_FALSE(!maybe_future.ok())) {
    return Future<T>::MakeFinished(std::move(maybe_future).status());
  }
  return std::move(maybe_future).MoveValueUnsafe();
}

/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future's result is a vector of the results of `futures`.
/// Note that this future will never be marked "failed"; failed results
/// will be stored in the result vector alongside successful results.
template <typename T>
Future<std::vector<Result<T>>> All(std::vector<Future<T>> futures) {
  struct State {
    explicit State(std::vector<Future<T>> f)
        : futures(std::move(f)), n_remaining(futures.size()) {}

    std::vector<Future<T>> futures;
    std::atomic<size_t> n_remaining;
  };

  if (futures.size() == 0) {
    return {std::vector<Result<T>>{}};
  }

  auto state = std::make_shared<State>(std::move(futures));

  auto out = Future<std::vector<Result<T>>>::Make();
  for (const Future<T>& future : state->futures) {
    future.AddCallback([state, out](const Result<T>&) mutable {
      if (state->n_remaining.fetch_sub(1) != 1) return;

      std::vector<Result<T>> results(state->futures.size());
      for (size_t i = 0; i < results.size(); ++i) {
        results[i] = state->futures[i].result();
      }
      out.MarkFinished(std::move(results));
    });
  }
  return out;
}

/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will be marked complete if all `futures` complete
/// successfully. Otherwise, it will be marked failed with the status of
/// the first failing future.
ARROW_EXPORT
Future<> AllComplete(const std::vector<Future<>>& futures);

/// \brief Create a Future which completes when all of `futures` complete.
///
/// The future will finish with an ok status if all `futures` finish with
/// an ok status. Otherwise, it will be marked failed with the status of
/// one of the failing futures.
///
/// Unlike AllComplete this Future will not complete immediately when a
/// failure occurs.  It will wait until all futures have finished.
ARROW_EXPORT
Future<> AllFinished(const std::vector<Future<>>& futures);

/// @}

struct Continue {
  template <typename T>
  operator std::optional<T>() && {  // NOLINT explicit
    return {};
  }
};

template <typename T = internal::Empty>
std::optional<T> Break(T break_value = {}) {
  return std::optional<T>{std::move(break_value)};
}

template <typename T = internal::Empty>
using ControlFlow = std::optional<T>;

/// \brief Loop through an asynchronous sequence
///
/// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On completion
/// of each yielded future the resulting ControlFlow will be examined. A Break will
/// terminate the loop, while a Continue will re-invoke `iterate`.
///
/// \return A future which will complete when a Future returned by iterate completes with
/// a Break
template <typename Iterate,
          typename Control = typename detail::result_of_t<Iterate()>::ValueType,
          typename BreakValueType = typename Control::value_type>
Future<BreakValueType> Loop(Iterate iterate) {
  struct Callback {
    bool CheckForTermination(const Result<Control>& control_res) {
      if (!control_res.ok()) {
        break_fut.MarkFinished(control_res.status());
        return true;
      }
      if (control_res->has_value()) {
        break_fut.MarkFinished(**control_res);
        return true;
      }
      return false;
    }

    void operator()(const Result<Control>& maybe_control) && {
      if (CheckForTermination(maybe_control)) return;

      auto control_fut = iterate();
      while (true) {
        if (control_fut.TryAddCallback([this]() { return *this; })) {
          // Adding a callback succeeded; control_fut was not finished
          // and we must wait to CheckForTermination.
          return;
        }
        // Adding a callback failed; control_fut was finished and we
        // can CheckForTermination immediately. This also avoids recursion and potential
        // stack overflow.
        if (CheckForTermination(control_fut.result())) return;

        control_fut = iterate();
      }
    }

    Iterate iterate;

    // If the future returned by control_fut is never completed then we will be hanging on
    // to break_fut forever even if the listener has given up listening on it.  Instead we
    // rely on the fact that a producer (the caller of Future<>::Make) is always
    // responsible for completing the futures they create.
    // TODO: Could avoid this kind of situation with "future abandonment" similar to mesos
    Future<BreakValueType> break_fut;
  };

  auto break_fut = Future<BreakValueType>::Make();
  auto control_fut = iterate();
  control_fut.AddCallback(Callback{std::move(iterate), break_fut});

  return break_fut;
}

inline Future<> ToFuture(Status status) {
  return Future<>::MakeFinished(std::move(status));
}

template <typename T>
Future<T> ToFuture(T value) {
  return Future<T>::MakeFinished(std::move(value));
}

template <typename T>
Future<T> ToFuture(Result<T> maybe_value) {
  return Future<T>::MakeFinished(std::move(maybe_value));
}

template <typename T>
Future<T> ToFuture(Future<T> fut) {
  return fut;
}

template <typename T>
struct EnsureFuture {
  using type = decltype(ToFuture(std::declval<T>()));
};

}  // namespace arrow