Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
torch / include / c10d / reducer.hpp
Size: Mime:
#pragma once

#include <atomic>
#include <memory>
#include <mutex>
#include <tuple>
#include <unordered_map>
#include <vector>

#include <ATen/core/ivalue_inl.h>
#include <c10/macros/Macros.h>
#include <c10/util/intrusive_ptr.h>
#include <c10d/ProcessGroup.hpp>
#include <c10d/Utils.hpp>
#include <c10d/comm.hpp>
#include <c10d/debug.h>
#include <c10d/reducer_timer.hpp>
#include <c10d/default_comm_hooks.hpp>
#include <torch/csrc/autograd/function.h>
#include <torch/csrc/autograd/profiler.h>
#include <torch/csrc/autograd/variable.h>
#ifndef _WIN32
#include <torch/csrc/distributed/autograd/context/context.h>
#endif

namespace c10d {

constexpr int kDefaultFirstBucketBytes = int(1024 * 1024);
constexpr int kDefaultBucketBytesCap = int(25 * 1024 * 1024);
// Collect runtime stats once for every kDDPRuntimeLoggingSampleRate iterations.
constexpr int kDDPRuntimeLoggingSampleRate = 100;

// Forward declaration
class Logger;

// Local accumulator type for a single bucket.
struct BucketAccumulator {
  std::vector<size_t> indices;
  size_t size = 0;
  size_t size_limit = 0;
};

class TORCH_API Reducer {
 public:
  // The constructor takes a list of variables (i.e. parameters) for this
  // process's single model replica (as DDP assumes single-process
  // single-device). The bucket assignment for this reducer, `bucket_indices`,
  // is specified as a list of buckets, each of which is specified as a list of
  // indices into the bucket's `variables` list.
  explicit Reducer(
      std::vector<at::Tensor> params,
      std::vector<std::vector<size_t>> bucket_indices,
      std::vector<size_t> per_bucket_size_limits,
      c10::intrusive_ptr<c10d::ProcessGroup> process_group,
      std::vector<bool> expect_sparse_gradients,
      int64_t bucket_bytes_cap,
      bool find_unused_parameters,
      bool gradient_as_bucket_view,
      std::unordered_map<size_t, std::string> param_names,
      int64_t first_bucket_bytes_cap);

  ~Reducer() noexcept(false);

  // To (re-)initialize bucket assignment, pass a list of buckets, each of
  // which is specified by a list of indices in the bucket's `variables` list.
  // This function performs validation that the variables within a bucket
  // all live on the same device and have the same dimensionality.
  void initialize_buckets(std::vector<std::vector<size_t>> bucket_indices);

  // This function is called when the forward function has produced an output,
  // and the user wishes to reduce gradients in the backwards pass.
  // If they don't, and wish to accumulate gradients before reducing them,
  // a call to this function can simply be omitted.
  void prepare_for_backward(const std::vector<at::Tensor>& outputs);

  // Called at the begginning of forward() inside DistributedDataParallel,
  // right now it caputures the starting time of forward in each iteration.
  void prepare_for_forward();

  // Returns the relative time in nanoseconds when gradients were ready,
  // with respect to the time `prepare_for_backward` was called. The
  // vector is for parameters for a single model replica.
  std::vector<int64_t> get_backward_stats() const {
    return backward_stats_;
  }

  // Registers a hook to the reducer. The hook is `CommHookInterface`
  // type to allow both Python and CPP hooks. This function can only
  // be called once before calling backward.
  // Cannot combine with the call of `register_builtin_comm_hook`.
  void register_comm_hook(std::unique_ptr<CommHookInterface> iface);

  // Registers a built-in C++ comm hook to the reducer. This function can only
  // be called once before calling backward.
  // Cannot combine with the call of `register_comm_hook`.
  void register_builtin_comm_hook(c10d::BuiltinCommHookType comm_hook_type);

  // Runs allreduce or installed communication hook given GradBucket instance.
  c10::intrusive_ptr<c10::ivalue::Future> run_comm_hook(
      GradBucket& grad_bucket);

  // Returns gradient buckets in sequential order of buckets_. This is the order
  // in which buckets are reduced across processes. If return_zero_tensors=true,
  // will return zero tensors of the same shape instead of the true tensors.
  std::vector<c10d::GradBucket> get_grad_buckets(
      bool return_zero_tensors = true) const;

  // Rebuild buckets based on rebuilt_params_ and rebuilt_param_indices_
  // according to when tensors received grads in the backward pass.
  // TODO this function makes broadcast communication call and
  // could be overlapped with next forward() call, thus
  // it could be async. Will make it async when rebuilding buckets for
  // find_unused_parameters = true case, as we could rebuild buckets more than
  // once for find_unused_parameters = true case, where subgraphs are trained
  // and parameter indices order may change more frequently.
  // For find_unused_parameters = false case, buckets are only rebuilt once,
  // the performance cost is negligible. Returns true if the buckets were
  // rebuilt.
  bool rebuild_buckets();

  // Install futures that should be awaited at end of backwards. Currently these
  // are only used by user-defined custom buffer reduction hooks, but can be generalized
  // to any user-originating futures that need to be awaited.
  void install_futures(c10::List<c10::intrusive_ptr<c10::ivalue::Future>> futs);

  // Returns true if we should rebuild buckets, else false. We only rebuild
  // buckets once after the first iteration and never rebuild them if
  // find_unused_parameters_.
  inline bool should_rebuild_buckets() const {
    return (static_graph_ || !find_unused_parameters_) && !has_rebuilt_bucket_;
  }

  // Pushes all parameters to be rebuilt.
  void push_rebuilt_params_for_all_indices();

  // Creates and sets ForwardPassWorkHandle given a ProcessGroup::Work and the
  // corresponding tensor being reduced.
  void set_forward_pass_work_handle(
      c10::intrusive_ptr<c10d::ProcessGroup::Work> forwardPassWorkHandle,
      bool useStaticWorldSize);

  // Retrieve on-device tensors used to track locally unused parameters. It is
  // a tensor where index i = 1 if the Variable with that index has been used.
  at::Tensor get_local_used_map_on_device() const;

  // An function for users to set sample_rate of collecting
  // runtime stats. The time stats will be recorded for the
  // first 10 iterations, after 10 iteratons time stats will be
  // recorded once every "sample_rate" training iterations.
  void set_ddp_runtime_logging_sample_rate(int sample_rate);

  // Specify the training graph is static.
  void set_static_graph();

  // Delay all reduce to be after all gradients' calculation is complete.
  void delay_all_reduce();

  // Weak reference to associated DDP logger. The reference is weak to avoid
  // refcycle between reducer and logger.
  void set_logger(std::weak_ptr<c10d::Logger> logger);

  // When graph is not explicitly set by user as static and has unused
  // parameters, this will return whether the graph has been static until the
  // current iteration, which means unused params set has not changed.
  bool ddp_graph_static();

 protected:
  // Forward declaration.
  struct Bucket;

  void push_rebuilt_params(const size_t& index);

  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  mutable std::mutex mutex_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  const std::vector<at::Tensor> params_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  const c10::intrusive_ptr<::c10d::ProcessGroup> process_group_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  std::vector<bool> expect_sparse_gradients_;

  std::vector<std::shared_ptr<torch::autograd::Node>>
      grad_accumulators_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes)
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  std::unordered_map<torch::autograd::Node*, size_t> gradAccToVariableMap_;
  std::vector<std::pair<uintptr_t, std::shared_ptr<torch::autograd::Node>>>
      hooks_; // NOLINT(cppcoreguidelines-non-private-member-variables-in-classes)

  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  bool expect_autograd_hooks_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  bool require_finalize_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  size_t next_bucket_;

  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  bool has_marked_unused_parameters_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  const bool find_unused_parameters_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  const bool gradient_as_bucket_view_;
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  std::vector<size_t> unused_parameters_;
  // Previous iteration's unused params, used for checking if unused parameters
  // change between iterations. Only filled during the first backwards call.
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  std::vector<size_t> prev_iteration_unused_parameters_;
  // Whether graph is static or not. When user does not explicitly set static
  // graph, the only possible dynamism is set of unused parameters changing
  // between iterations which is tracked by this flag.
  // NOLINTNEXTLINE(cppcoreguidelines-non-private-member-variables-in-classes)
  bool ddp_graph_static_{true};
  // Locally used parameter maps indicating if parameters are used locally
  // during the current iteration or no_sync session if no_sync is on.
  // Each map is a one-dim int32 tensor of number of parameters. These tensors
  // are marked in autograd_hook to indicate the corresponding param has been
  // used, and get allreduced in the end of backward step of current iteration
  // or no_sync session for figuring out the globally unused parameters.
  //
  // local_used_map_:     CPU tensor for bookkeeping locally used params
  // local_used_map_dev_: dev tensor for reducing globally unused params
  at::Tensor local_used_map_;
  at::Tensor local_used_map_dev_;
  // Indicate that reduction is done and D2H copy is done as well.
  bool local_used_map_reduced_;

  // Weak pointer to associated DDP logger.
  std::weak_ptr<c10d::Logger> logger_;
  // List of futures installed by Reducer::install_futures that should be awaited
  // at the end of backwards pass.
  c10::optional<c10::List<c10::intrusive_ptr<c10::ivalue::Future>>> installed_futures_{c10::nullopt};

  // Work handle for allreduce on local_used_map_
  c10::intrusive_ptr<c10d::ProcessGroup::Work> local_used_work_;

  void mark_variable_ready_dense(size_t variable_index);

  void mark_variable_ready_sparse(size_t variable_index);

  void mark_variable_ready(size_t variable_index);

  void autograd_hook(size_t index);

  void mark_bucket_ready(size_t bucket_index);

  void finalize_bucket_dense(Bucket& bucket);

  void finalize_backward();

  // Returns list of model parameters corresponding to the given bucket.
  // bucket_index is a key to cache after buckets are rebuilt, after which this
  // mapping never changes.
  std::vector<at::Tensor> get_variables_for_bucket(
      size_t bucket_index, const Bucket& bucket) const;

  // Asserts that the reduction for the previous iteration has finished before
  // rebuilding buckets or kicking off the next one.
  void ensure_prior_reduction_finished();

  // Broadcast rebuilt buckets from rank 0 to other ranks before initializing
  // the buckets
  void sync_bucket_indices(std::vector<std::vector<size_t>>& bucket_indices);

  // We'd like to use DistAutogradContext::GradCallback here but dist autograd
  // doesn't exist under Windows. So we just directly use the concrete type but
  // to preserve and enforce our original intent we do a static assert when dist
  // autograd is available.
  using GradCallback = std::function<bool(at::Tensor&)>;
#ifndef _WIN32
  static_assert(
      std::is_same<
          GradCallback,
          torch::distributed::autograd::DistAutogradContext::GradCallback>::
          value,
      "");
#endif
  void runGradCallbackForVariable(at::Tensor& variable, GradCallback&& cb);

  // This function is called inside `initialize_buckets()`. It initializes both
  // `bucket_views_in` and `bucket_views_out` with views for each variable's
  // gradient into the bucket's flattened `gradients` tensor. Views serve as
  // entry points to `copy_()` each grad's data in/out of the flattened
  // `gradients` tensor.
  void initialize_bucket_views(Bucket& bucket);

  // This function is called inside `finalize_backward`, it happens only if
  // DDP communication hook was registered to recreate just bucket_views_out
  // with the result of `future_work`.
  void populate_bucket_views_out(Bucket& bucket, at::Tensor& tensor);

  // If gradient_as_bucket_view_ is false, after allreduce buckets,
  // copy bucket results back to grads.
  void copy_bucket_to_grad(
      at::Tensor& variable,
      Reducer::Bucket& bucket,
      size_t intra_bucket_index,
      bool global_unused);
  // Check layout of grad and bucket_view before copying the grad to bucket.
  void check_grad_layout(const at::Tensor& grad, const at::Tensor& bucket_view);

  // A bucket contains [1..N] gradients to be reduced, where the gradients
  // have the same dtype and device.
  // Coalescing gradients together before reducing can result in lower overhead
  // and/or faster time to completion. Coalescing requires the constituent
  // gradients to have the same dtype and device, and the resulting flattened
  // tensor uses that common dtype and device. The flattened tensor is filled
  // as the corresponding gradients are computed (triggered by autograd hooks),
  // and the buckets are reduced in a predetermined order consistent across
  // processes.
  struct Bucket {
    // Gradients of the bucket flattened into a 1-dimensional tensor
    at::Tensor gradients;

    // Views into the `gradients` tensor for each individual gradient
    // Each view is created with layout (size and stride) matching the
    // gradient's expected layout (see the "Gradient Layout Contract" in
    // torch/csrc/autograd/functions/accumulate_grad.h).
    // `bucket_views_in[i].copy_(grad)` and `grad.copy_(bucket_views_out[i])`
    // provide convenient ways to copy gradient data in/out of `gradients`,
    // respectively.
    // We keep both `bucket_views_in` and `bucket_views_out` because
    // registering a DDP communication hook may re-initialize
    // `bucket_views_out` with the value of the hook's `future_work` but we
    // still need separate views into the bucket's original flattened gradient
    // to copy in gradient data.
    std::vector<at::Tensor> bucket_views_in;
    std::vector<at::Tensor> bucket_views_out;

    // Variables whose gradients are held in this bucket
    // We use refcounted tensors here so that we can easily unflatten the
    // bucket's flattened `gradients` tensor into the participating variables
    // after reduction has completed.
    std::vector<at::Tensor> variables;

    // Per-variable offset/length into the flattened `gradients` tensor and
    // the corresponding `GradBucket` instance for communication hooks
    std::vector<size_t> offsets;
    std::vector<size_t> lengths;

    // Per-variable sizes slicing into the bucket's `gradients` tensor
    std::vector<c10::IntArrayRef> sizes_vec;

    // Number of gradients left to be computed before the bucket is ready to
    // be reduced
    size_t pending;

    // Global indices of participating variables in the bucket
    std::vector<size_t> variable_indices;

    // Future work handle for DDP communication hook
    // If no hook is registered, a temporary vanilla allreduce hook is used.
    c10::intrusive_ptr<at::ivalue::Future> future_work;

    // If this bucket should expect a single sparse gradient
    // If `true`, then this implies that `bucket.variables.size() == 1`.
    bool expect_sparse_gradient = false;

    // TODO(@pietern)
    // Memory copies from gradient tensors into the bucket are potentially
    // done on different CUDA streams. We record an event for every copy
    // so that we can synchronize with them prior to kicking off the reduction.
    // std::vector<at::cuda::CUDAEvent> events;

  };

  std::vector<Bucket> buckets_;

  // A variable locator locates a particular variable in the reducer's buckets
  struct VariableLocator {
    // Index of the bucket containing the variable in the `buckets_` vector
    size_t bucket_index;
    // Index of the variable in the bucket, which may be used consistently
    // across `bucket_views_in`, `bucket_views_out`, `variables`, `offsets`,
    // `lengths`, `sizes_vec`, and `variable_indices` in `Bucket`
    size_t intra_bucket_index;

    VariableLocator() = default;

    VariableLocator(size_t bucket_index_, size_t intra_bucket_index_) {
      bucket_index = bucket_index_;
      intra_bucket_index = intra_bucket_index_;
    }
  };

  // Map the index of a variable to its location in the bucket structure.
  std::vector<VariableLocator> variable_locators_;

  // track the number of iterations to synchronize grads in training so far.
  long num_iterations_;
  // track the number of buckets that have been ready for
  // communication calls like allReduce or communication hooks.
  int num_buckets_ready_;

  // Timing information.
  int64_t backward_compute_start_time_ = -1;
  std::unique_ptr<Timer> timer_;

  // We collect the relative timestamp of every gradient being ready
  // when executing autograd. This can be used to derive a timeline of
  // the point in time buckets were ready, or ideal bucket assignment/ordering.
  std::vector<int64_t> backward_stats_;

  bool should_collect_runtime_stats();
  void record_forward_compute_start_time();
  void record_backward_compute_start_time();
  void record_backward_compute_end_time();
  void record_backward_comm_start_time();
  void record_backward_comm_end_time();

  int get_ddp_runtime_logging_sample_rate();
  int ddp_runtime_logging_sample_rate_ = kDDPRuntimeLoggingSampleRate;

  bool is_multi_device_module_ = false;

  // Following variables are to help build dynamic bucket order
  bool has_rebuilt_bucket_;
  std::vector<at::Tensor> rebuilt_params_;
  std::vector<int64_t> rebuilt_param_indices_;
  const int64_t bucket_bytes_cap_;

#ifndef _WIN32
  struct RpcContext {
    using ContextPtr = torch::distributed::autograd::ContextPtr;
    // The shared_ptr is to hold the context instance.
    ContextPtr context_ptr_holder;
    std::atomic<ContextPtr::element_type*> context_ptr{nullptr};

    void set(ContextPtr&& new_context_ptr);
  };
  RpcContext rpc_context_;
#endif

  // A struct containing work handle and tensor for allreduce scheduled in
  // forward pass, if applicable.
  struct ForwardPassAllreduceWork {
    c10::intrusive_ptr<c10d::ProcessGroup::Work> workHandle;
    at::Tensor resultTensor;
    // whether we should divide by the initial world_size or the no. of
    // remaining DDP ranks.
    bool useStaticWorldSize;
  };

  // Handle for the currently scheduled allreduce in the forward pass, if
  // applicable.
  ForwardPassAllreduceWork forwardPassWorkHandle_;

  // Division factor for reduction of gradients.
  // Equal to the process group size, with an exception of handling uneven
  // input.
  int div_factor_;

  bool static_graph_;

  // Key: size_t (index), Value: the number of times that a variable's
  // autograd_hook() should be triggered before marking this variable's grad as
  // ready for communication. Map will not change after 1st iteration.
  std::unordered_map<size_t, int> numGradHooksTriggeredMap_;
  // Key: size_t (index), Value: the number of times that a variable's
  // autograd_hook() are left to be triggered before marking this variable's
  // grad as ready for communication. Map will change after 1st iteration to
  // track a grad is ready for communication or not.
  std::unordered_map<size_t, int> numGradHooksTriggeredMapPerIteration_;

 private:
  // reset counting for buckets before backward starts
  void reset_bucket_counting();
  // search unused parameters beore backward starts
  void search_unused_parameters(
      const std::vector<torch::autograd::Variable>& outputs);
  void set_divide_factor();
  // kick off all reduce for the ready bucket
  void all_reduce_bucket(Bucket& bucket);
  // kick off all reduce to local used map, it can help find global unused
  // parameters
  void all_reduce_local_used_map();
  // initialize locally used parameter maps
  void initialize_local_used_map();
  // get current cuda stream
  const c10::Stream get_current_stream();
  bool dynamic_graph_find_unused();
  bool static_graph_first_iteration();
  bool static_graph_after_first_iteration();

  // comm_hook_ is used to access the DDP communication hook if registered.
  std::unique_ptr<CommHookInterface> comm_hook_;
  // Debug level setting. It is parsed once when Reducer is constructed, and
  // remains the same across a single invocation of DDP training.
  DebugLevel ddp_debug_level_;
  // Mapping of variable index to fully qualified name of model to notify users
  // about errors when certain parameters do not get gradient.
  std::unordered_map<size_t, std::string> param_names_;
  // Variable indices stored sequentially in order of when the gradient is ready
  // for the current backwards pass.
  std::vector<int> grad_ready_order_indices_;
  // Bytes capacity of first bucket, can be configured by user
  int64_t first_bucket_bytes_cap_;
  // Per iteration set of parameter indices that have been marked ready.
  std::unordered_set<size_t> perIterationReadyParams_;
  // Retrieves parameter names that have not been marked as ready as part of
  // previous iteration.
  std::vector<std::string> getUnmarkedParamsForIteration();
  // Retrives parameter indices that have not been marked as ready as part of
  // previous iteration.
  std::vector<size_t> getUnmarkedParamIndicesForIteration();
  // Raises appropriate error if mark_variable_ready is called on the same
  // variable twice, which is unexpected.
  void checkAndRaiseMarkedTwiceError(size_t curVariableIndex);
  // Retrieves parameter corresponding to the given VariableIndex.
  at::Tensor& get_param_from_index(size_t index);

  // Cached bucket index to model parameter mapping. Populated after buckets
  // are rebuilt after which this mapping is static.
  mutable std::unordered_map<size_t, std::vector<at::Tensor>> cached_variables_for_bucket_;

  friend class Logger;
};

// This is equivalent to take_tensors but returns indices into the
// tensor list argument for bucket assignment. Also, it is aware
// of device placement and will not allow buckets to span devices.
// The index of tensors[i] assigned to bucket is tensor_indices[i],
// when tensor_indices is empty, the index of tensors[i] assigned to
// bucket is i.
TORCH_API std::tuple<std::vector<std::vector<size_t>>, std::vector<size_t>>
compute_bucket_assignment_by_size(
    const std::vector<at::Tensor>& tensors,
    const std::vector<size_t>& bucket_size,
    const std::vector<bool>& expect_sparse_gradient = {},
    const std::vector<int64_t>& tensor_indices = {},
    const c10::optional<std::weak_ptr<c10d::Logger>>& logger = {});

// Verify models across all processes are the same as model on rank 0 with
// respect to no. of params and matching dtype/size/layout.
TORCH_API void verify_params_across_processes(
    const c10::intrusive_ptr<c10d::ProcessGroup>& process_group,
    const std::vector<at::Tensor>& params,
    const c10::optional<std::weak_ptr<c10d::Logger>>& logger);
} // namespace c10d