Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

neilisaac / torch   python

Repository URL to install this package:

Version: 1.8.0 

/ include / caffe2 / operators / partition_ops.h

#ifndef CAFFE2_OPERATORS_PARTITION_OPS_H_
#define CAFFE2_OPERATORS_PARTITION_OPS_H_

#include "caffe2/core/context.h"
#include "caffe2/core/operator.h"

namespace caffe2 {

template <typename Index>
static inline int moduloPartition(Index key, int numPartitions) {
  int shard = key % numPartitions;
  // equivalent to `if (shard < 0) shard += partitions;`
  shard += numPartitions & (shard >> (sizeof(int) * 8 - 1));
  return shard;
}

class GatherByKeyOp : public Operator<CPUContext> {
 public:
  USE_DISPATCH_HELPER;
  USE_OPERATOR_FUNCTIONS(CPUContext);
  template <class... Args>
  explicit GatherByKeyOp(Args&&... args)
      : Operator<CPUContext>(std::forward<Args>(args)...) {}

 private:
  bool RunOnDevice() override {
    return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
  }

 private:
  template <typename Index>
  bool DoRunWithType() {
    const auto numPartitions = InputSize() - 1;
    CAFFE_ENFORCE_GE(numPartitions, 1);
    const auto& keysTensor = Input(0);
    const auto* keysData = keysTensor.template data<Index>();
    const auto& keysShape = Input(0).sizes();
    CAFFE_ENFORCE_EQ(
        keysShape.size(), 1, "Only 1D keys tensor supported currently.");

    // 1. Shape and type consistency checks
    const auto& in0Shape = Input(1).sizes();
    CAFFE_ENFORCE_GE(in0Shape.size(), 1);

    vector<int64_t> outShape(keysShape.vec());
    outShape.insert(outShape.end(), in0Shape.begin() + 1, in0Shape.end());

    CAFFE_ENFORCE_GE(outShape.size(), 1);
    auto totalSize = in0Shape[0];
    auto meta = Input(1).dtype();
    for (int i = 2; i < InputSize(); ++i) {
      const auto& input = Input(i);
      CAFFE_ENFORCE(meta == input.dtype());
      CAFFE_ENFORCE_GE(input.dim(), 1);
      CAFFE_ENFORCE(std::equal(
          outShape.begin() + keysShape.size(),
          outShape.end(),
          input.sizes().begin() + 1));
      totalSize += input.size(0);
    }
    CAFFE_ENFORCE_EQ(keysTensor.numel(), totalSize);

    auto* outTensor = Output(0);
    outTensor->Resize(outShape);
    auto* outData = static_cast<char*>(outTensor->raw_mutable_data(meta));
    const auto blockSize = outTensor->size_from_dim(1);

    inputDatas_.resize(numPartitions);
    for (int i = 0; i < numPartitions; ++i) {
      inputDatas_[i] = static_cast<const char*>(Input(i + 1).raw_data());
    }
    inStartOffsets_.assign(numPartitions, 0);
    Index outStartOffset = 0;
    int currentShard = -1;

    // 2. copy from inputs into output based on shard for each input key
    const auto numEntries = keysTensor.numel();
    for (int64_t i = 0; i <= numEntries; ++i) {
      auto newShard =
          i < numEntries ? moduloPartition(keysData[i], numPartitions) : -1;
      if (newShard != currentShard) {
        if (currentShard != -1) {
          auto inStartOffset = inStartOffsets_[currentShard];
          auto numItems = i - outStartOffset;
          context_.CopyItemsSameDevice(
              meta,
              numItems * blockSize,
              inputDatas_[currentShard] +
                  inStartOffset * blockSize * meta.itemsize(),
              outData + outStartOffset * blockSize * meta.itemsize());
          inStartOffsets_[currentShard] += numItems;
        }
        currentShard = newShard;
        outStartOffset = i;
      }
    }

    return true;
  }

  std::vector<const char*> inputDatas_;
  std::vector<int64_t> inStartOffsets_;
};

class PartitionOpBase : public Operator<CPUContext> {
 public:
  USE_OPERATOR_FUNCTIONS(CPUContext);

  template <class... Args>
  explicit PartitionOpBase(Args&&... args)
      : Operator<CPUContext>(std::forward<Args>(args)...),
        OP_SINGLE_ARG(int, "pack_first_input", pack_first_input_, 0) {}

 protected:
  template <typename Index>
  void ApplyPartition(bool skipFirstArgument) {
    CAFFE_ENFORCE_EQ(
        OutputSize() % InputSize(),
        0,
        "Output number must be a multiple of input number");
    int partitions = OutputSize() / InputSize();
    int inputSize = InputSize();
    int mainInputIndex = skipFirstArgument;
    CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");

    auto& main_input = Input(mainInputIndex);
    int64_t size = main_input.numel();
    const Index* data = main_input.template data<Index>();
    counts_.assign(partitions, 0);
    for (int64_t p = 0; p < size; p++) {
      int shard = moduloPartition(data[p], partitions);
      ++counts_[shard];
    }

    raw_datas_.resize(inputSize);
    block_sizes_.resize(inputSize);
    metas_.resize(inputSize);
    out_datas_.resize(OutputSize());
    for (int i = mainInputIndex; i < inputSize; ++i) {
      auto& input = Input(i);
      if (i > mainInputIndex) {
        CAFFE_ENFORCE_GE(
            input.dim(),
            main_input.dim(),
            "Prefix of extra input's shape must match main input's shape, ",
            "input: ",
            i);
        for (int j = 0; j < main_input.dim(); ++j) {
          CAFFE_ENFORCE_GE(
              input.size(j),
              main_input.size(j),
              "Prefix of extra input's shape must match main input's shape, ",
              "input: ",
              i,
              ", dim ",
              j);
        }
      }
      raw_datas_[i] = input.raw_data();
      block_sizes_[i] = input.size_from_dim(main_input.dim());
      metas_[i] = input.dtype();
      // shape = partition_size + suffix of input dims
      vector<int64_t> shape(
          input.sizes().begin() + main_input.dim() - 1, input.sizes().end());
      for (int j = 0; j < partitions; ++j) {
        int out_idx = i + j * inputSize;
        auto output = Output(out_idx);
        shape[0] = counts_[j];
        output->Resize(shape);
        out_datas_[out_idx] = output->raw_mutable_data(input.dtype());
      }
    }

    counts_.assign(partitions, 0);
    for (int64_t p = 0; p < size; p++) {
      int shard = moduloPartition(data[p], partitions);
      int64_t idx = counts_[shard]++;

      // special case first input
      static_cast<Index*>(out_datas_[shard * inputSize + mainInputIndex])[idx] =
          pack_first_input_ ? ((data[p] - shard) / partitions) : data[p];

      int baseIndex = shard * inputSize;
      for (int i = mainInputIndex + 1; i < inputSize; ++i) {
        auto bs = block_sizes_[i];
        auto meta = metas_[i];
        // special case for small bs?
        context_.CopyItemsSameDevice(
            meta,
            bs,
            static_cast<const char*>(raw_datas_[i]) + p * bs * meta.itemsize(),
            static_cast<char*>(out_datas_[baseIndex + i]) +
                idx * bs * meta.itemsize());
      }
    }
  }

  bool pack_first_input_;

  // use member fields to reuse memory
  vector<int64_t> counts_;
  vector<int64_t> block_sizes_;
  vector<TypeMeta> metas_;
  vector<const void*> raw_datas_;
  vector<void*> out_datas_;
};

class PartitionOp : public PartitionOpBase {
 public:
  USE_DISPATCH_HELPER;

  template <class... Args>
  explicit PartitionOp(Args&&... args)
      : PartitionOpBase(std::forward<Args>(args)...) {}

  bool RunOnDevice() override {
    return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(0));
  }

 private:
  template <typename Index>
  bool DoRunWithType() {
    ApplyPartition<Index>(false /* skipFirstArgument */);
    return true;
  }

  C10_DISABLE_COPY_AND_ASSIGN(PartitionOp);
};

class LengthsPartitionOp : public PartitionOpBase {
 public:
  USE_DISPATCH_HELPER;

  template <class... Args>
  explicit LengthsPartitionOp(Args&&... args)
      : PartitionOpBase(std::forward<Args>(args)...) {}

  bool RunOnDevice() override {
    return DispatchHelper<TensorTypes<int32_t, int64_t>>::call(this, Input(1));
  }

 private:
  template <typename Index>
  bool DoRunWithType() {
    CAFFE_ENFORCE(
        OutputSize() % InputSize() == 0,
        "Output number must be a multiple of input number");
    int partitions = OutputSize() / InputSize();
    CAFFE_ENFORCE_GT(partitions, 0, "Invalid number of partitions");
    CAFFE_ENFORCE_EQ(
        Input(1).dim(),
        1,
        "Only 1-D tensors supported as a partitioning tensor for sharding");

    if (partitions == 1) {
      // Specialization when partitions == 1 which just becomes a copy.
      for (int i = 0; i < InputSize(); ++i) {
        auto& input = Input(i);
        auto& output = *Output(i);
        output.ResizeLike(input);
        context_.CopyItemsSameDevice(
            input.dtype(),
            input.numel(),
            input.raw_data(),
            output.raw_mutable_data(input.dtype()));
      }
      return true;
    }

    // Apply sharding to all parameters except lengths
    ApplyPartition<Index>(true /* skipFirstArgument */);

    // Compute lengths after sharding
    auto& main_input = Input(1);
    int64_t size = main_input.numel();
    const Index* data = main_input.template data<Index>();

    auto& length_input = Input(0);
    int64_t elements = length_input.numel();
    const int32_t* lengths_data = length_input.template data<int32_t>();
    out_length_.resize(partitions);
    for (int i = 0; i < partitions; ++i) {
      auto& output = *Output(i * InputSize());
      output.Resize(elements);
      out_length_[i] = output.template mutable_data<int32_t>();
    }

    int total_length = 0;
    for (int i = 0; i < elements; ++i) {
      total_length += lengths_data[i];
    }
    CAFFE_ENFORCE(
        total_length == size,
        "Total length is not matching to the number of elements");

    int index = 0;
    for (int i = 0; i < elements; ++i) {
      for (int j = 0; j < partitions; ++j) {
        out_length_[j][i] = 0;
      }
      for (int j = 0; j < lengths_data[i]; ++j, ++index) {
        int shard = moduloPartition(data[index], partitions);
        ++out_length_[shard][i];
      }
    }
    return true;
  }

  C10_DISABLE_COPY_AND_ASSIGN(LengthsPartitionOp);

  vector<int32_t*> out_length_;
};

} // namespace caffe2

#endif // CAFFE2_OPERATORS_PARTITION_OPS_H_