Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
torch / include / torch / csrc / distributed / c10d / Utils.hpp
Size: Mime:
#pragma once

#include <ATen/ATen.h>
#include <c10/util/accumulate.h>
#include <c10/util/irange.h>
#include <torch/csrc/distributed/c10d/Types.hpp>

#ifdef _WIN32
#include <winsock2.h>
#include <ws2tcpip.h>
typedef SSIZE_T ssize_t;
#pragma comment(lib, "Ws2_32.lib")
#else
#include <fcntl.h>
#include <netdb.h>
#include <sys/poll.h>
#include <sys/socket.h>
#include <unistd.h>
#endif

#include <sys/types.h>

#include <chrono>
#include <cstdint>
#include <cstdlib>
#include <functional>
#include <limits>
#include <string>
#include <system_error>
#include <tuple>
#include <vector>

namespace c10d {

TORCH_API std::string parse_env(const char* env_var_name);

// Retrieve tensor shapes from a given tensor.
TORCH_API std::vector<at::Tensor> getTensorShapes(
    const std::vector<at::Tensor>& tensors);

// Use -2 to represent unset state of env vars
#define C10D_ENV_NOT_SET -2

// Turns at::IntArrayRef into "(1, 2, 3, 4)".
inline std::string toString(at::IntArrayRef l) {
  std::stringstream ss;
  ss << "(";
  for (const auto i : c10::irange(l.size())) {
    if (i > 0) {
      ss << ", ";
    }
    ss << l[i];
  }
  ss << ")";
  return ss.str();
}

inline std::string toString(const c10::Layout& layout) {
  std::stringstream ss;
  ss << layout;
  return ss.str();
}

inline void assertSameType(
    const at::DeprecatedTypeProperties& type,
    const std::vector<at::Tensor>& tensors) {
  for (const auto i : c10::irange(tensors.size())) {
    if (!tensors[i].options().type_equal(type.options())) {
      const std::string expected = type.toString();
      const std::string actual = tensors[i].toString();
      throw std::invalid_argument(
          "mixed types (" + expected + " and " + actual + ")");
    }
  }
}

inline std::vector<std::string> split(
    char separator,
    const std::string& string) {
  std::vector<std::string> pieces;
  std::stringstream ss(string);
  std::string item;
  while (std::getline(ss, item, separator)) {
    pieces.push_back(std::move(item));
  }
  return pieces;
}

inline int parseEnvVarInt(const char* envVarName) {
  char* stringValue = std::getenv(envVarName);
  if (stringValue != nullptr) {
    int val;
    try {
      val = std::stoi(stringValue);
    } catch (std::exception& e) {
      TORCH_CHECK(
          false,
          "Invalid value for environment variable: " + std::string(envVarName));
    }
    return val;
  }
  return C10D_ENV_NOT_SET;
}

inline const char* parseEnvVarString(
    const char* envVarName,
    const char* default_val) {
  const char* val = std::getenv(envVarName);
  if (val == nullptr) {
    val = default_val;
  }
  return val;
}

inline int parseEnvVarIntDefault(const char* envVarName, int defaultVal) {
  int val = parseEnvVarInt(envVarName);
  if (val == C10D_ENV_NOT_SET)
    return defaultVal;
  return val;
}

inline bool parseEnvVarFlag(const char* envVarName) {
  int val = parseEnvVarInt(envVarName);
  if (val == 1) {
    return true;
  } else if (val == 0 || val == C10D_ENV_NOT_SET) {
    return false;
  }
  TORCH_CHECK(
      false,
      "Invalid value for environment variable: " + std::string(envVarName));
  return false;
}

inline void assertSameSizes(
    const at::IntArrayRef& sizes,
    const std::vector<at::Tensor>& tensors) {
  for (const auto i : c10::irange(tensors.size())) {
    if (!tensors[i].sizes().equals(sizes)) {
      const auto expected = toString(sizes);
      const auto actual = toString(tensors[i].sizes());
      throw std::invalid_argument(
          "mixed sizes (" + expected + " and " + actual + ")");
    }
  }
}

inline void assertSameSizeAndType(const std::vector<at::Tensor>& tensors) {
  // Ensure we have at least one tensor
  if (tensors.empty()) {
    throw std::invalid_argument("argument is empty");
  }

  // Ensure all tensors have identical type and shape
  auto options = tensors[0].options();
  auto sizes = tensors[0].sizes();
  for (const auto i : c10::irange(1, tensors.size())) {
    if (!tensors[i].options().type_equal(options)) {
      const auto expected = toString(options);
      const auto actual = toString(tensors[i].options());
      throw std::invalid_argument(
          "argument contains mixed types (" + expected + " and " + actual +
          ")");
    }
    if (!tensors[i].sizes().equals(sizes)) {
      const auto expected = toString(sizes);
      const auto actual = toString(tensors[i].sizes());
      throw std::invalid_argument(
          "argument contains mixed sizes (" + expected + " and " + actual +
          ")");
    }
  }
}

inline void assertTypeMatch(
    std::function<void(const std::string&)> fn,
    const at::DeprecatedTypeProperties& type,
    const at::ArrayRef<at::Tensor> tensors,
    size_t index) {
  if (!tensors[index].options().type_equal(type.options())) {
    fn("invalid tensor type at index " + std::to_string(index) + " (expected " +
       type.toString() + ", got " + tensors[index].toString() + ")");
  }
}

inline void assertTypeMatch(
    std::function<void(const std::string&)> fn,
    const at::TensorOptions& options,
    const at::ArrayRef<at::Tensor> tensors,
    size_t index) {
  if (!tensors[index].options().type_equal(options)) {
    fn("invalid tensor type at index " + std::to_string(index) + " (expected " +
       toString(options) + ", got " + toString(tensors[index].options()) + ")");
  }
}

inline void assertSizesMatch(
    std::function<void(const std::string&)> fn,
    const at::IntArrayRef& sizes,
    const at::ArrayRef<at::Tensor> tensors,
    size_t index) {
  if (tensors[index].sizes() != sizes) {
    fn("invalid tensor size at index " + std::to_string(index) + " (expected " +
       toString(sizes) + ", got " + toString(tensors[index].sizes()) + ")");
  }
}

inline void assertLayoutMatch(
    std::function<void(const std::string&)> fn,
    const c10::Layout& expected,
    const at::ArrayRef<at::Tensor> tensors,
    size_t index) {
  const auto& actual = tensors[index].layout();
  if (actual != expected) {
    fn("invalid tensor layout at index " + std::to_string(index) +
       " (expected " + toString(expected) + ", got " + toString(actual) + ")");
  }
}

inline void assertLayoutMatch(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  const auto& layout = tensors[0].layout();
  for (const auto i : c10::irange(1, tensors.size())) {
    assertLayoutMatch(fn, layout, tensors, i);
  }
}

inline void assertNonEmpty(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  if (tensors.empty()) {
    fn("requires non-empty tensor list");
  }
}

inline void assertSingleElement(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  if (tensors.size() != 1) {
    fn("requires a single-element tensor list");
  }
}

inline void assertSingleElementInput(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  if (tensors.size() != 1) {
    fn("requires a single-element input tensor list");
  }
}

inline void assertSingleElementOutput(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  if (tensors.size() != 1) {
    fn("requires a single-element output tensor list");
  }
}

inline void assertRootRank(
    std::function<void(const std::string&)> fn,
    int rank,
    int size) {
  if (rank < 0 || rank >= size) {
    fn("invalid root rank: " + std::to_string(rank));
  }
}

inline void assertRootTensor(
    std::function<void(const std::string&)> fn,
    int rank,
    int size) {
  if (rank < 0 || rank >= size) {
    fn("invalid root tensor: " + std::to_string(rank));
  }
}

inline void assertDense(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  const auto& layout = tensors[0].layout();
  if (layout != at::kStrided) {
    fn("only supports dense tensors");
  }
}

inline void assertCPU(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  const auto& device = tensors[0].device();
  if (device.type() != at::kCPU) {
    fn("only supports CPU tensors");
  }
}

inline void assertSameDevice(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  if (tensors.size() < 2) {
    return;
  }
  const auto& device = tensors[0].device();
  for (const auto i : c10::irange(1, tensors.size())) {
    if (tensors[i].device() != device) {
      fn("tensors should be on the same device");
    }
  }
}

inline void assertTypeAndSizesMatch(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors,
    const at::DeprecatedTypeProperties& type,
    const at::IntArrayRef& sizes) {
  for (const auto i : c10::irange(tensors.size())) {
    assertTypeMatch(fn, type, tensors, i);
    assertSizesMatch(fn, sizes, tensors, i);
  }
}

inline void assertTypeAndSizesMatch(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors,
    const at::TensorOptions& options,
    const at::IntArrayRef& sizes) {
  for (const auto i : c10::irange(tensors.size())) {
    assertTypeMatch(fn, options, tensors, i);
    assertSizesMatch(fn, sizes, tensors, i);
  }
}

inline void assertTypeAndSizesMatch(
    std::function<void(const std::string&)> fn,
    const at::ArrayRef<at::Tensor> tensors) {
  const auto& options = tensors[0].options();
  const auto sizes = tensors[0].sizes();
  assertTypeAndSizesMatch(fn, tensors.slice(1), options, sizes);
}

// Copied from ATen/core/functional.h.
template <typename F, typename T>
inline auto fmap(T& inputs, const F& fn)
    -> std::vector<decltype(fn(*inputs.begin()))> {
  std::vector<decltype(fn(*inputs.begin()))> r;
  r.reserve(inputs.size());
  for (auto& input : inputs) {
    r.push_back(fn(input));
  }
  return r;
}

// Copied from torch/csrc/utils/tensor_flatten.h.
inline at::Tensor flattenDenseTensors(at::TensorList tensors) {
  static const auto flatten = [](const at::Tensor& t) {
    return t.contiguous().view({-1});
  };
  if (tensors.size() == 1) {
    return flatten(tensors[0]);
  }
  return at::cat(::c10d::fmap(tensors, flatten));
}

inline at::Tensor newLikeFlat(
    std::vector<std::vector<at::Tensor>>& tensors,
    size_t deviceIdx) {
  if (tensors.empty() || tensors[0].empty()) {
    TORCH_CHECK(false, "Received an empty list");
  }
  if (deviceIdx >= tensors.size()) {
    TORCH_CHECK(false, "Invalid device index");
  }
  auto& t = tensors[deviceIdx][0];
  auto device = t.device();
  for (const auto i : c10::irange(1, tensors[deviceIdx].size())) {
    if (tensors[deviceIdx][i].device() != device) {
      TORCH_CHECK(false, "Expecting all tensors on the same device");
    }
  }
  at::DeviceGuard gpuGuard(device);
  std::vector<int64_t> sizes{static_cast<int64_t>(tensors[deviceIdx].size())};
  std::vector<int64_t> strides{static_cast<int64_t>(t.numel())};
  sizes.insert(sizes.end(), t.sizes().begin(), t.sizes().end());
  strides.insert(strides.end(), t.strides().begin(), t.strides().end());
  return at::empty_strided(
      sizes, strides, t.options().memory_format(c10::nullopt));
}

inline at::Tensor newLikeFlat(std::vector<at::Tensor>& tensors) {
  if (tensors.empty()) {
    TORCH_CHECK(false, "Received an empty list");
  }
  auto& t = tensors[0];
  at::DeviceGuard gpuGuard(t.device());
  std::vector<int64_t> sizes{static_cast<int64_t>(tensors.size())};
  sizes.insert(sizes.end(), t.sizes().begin(), t.sizes().end());
  return at::empty(sizes, t.options());
}

inline std::vector<std::vector<int64_t>> getSizes(
    const std::vector<at::Tensor>& tensors) {
  std::vector<std::vector<int64_t>> sizes(tensors.size());
  for (const auto i : c10::irange(tensors.size())) {
    sizes[i] = tensors[i].sizes().vec();
  }
  return sizes;
}

inline std::vector<int> getDevices(const std::vector<at::Tensor>& tensors) {
  std::vector<int> devices(tensors.size(), -1);
  if (tensors[0].device().is_cuda()) {
    for (const auto i : c10::irange(tensors.size())) {
      devices[i] = tensors[i].storage().device().index();
    }
  }
  return devices;
}

template <typename T>
inline T* getDataPointer(const at::Tensor& tensor) {
  // This method is only used in ProcessGroupGloo for now. Call sites must make
  // sure that the input tensor is contiguous. It is OK if the tensor does not
  // start from the beginning of the storage. For example, it could come from
  // chunk(..., dim=0)[1]. Hence, we need to use data_ptr() instead of
  // tensor.storage().data()
  // NB: not using tensor.data<T>() because tensor is not aware of gloo::TYPE
  return static_cast<T*>(tensor.data_ptr());
}

template <typename T>
std::vector<T*> getDataPointers(const std::vector<at::Tensor>& tensors) {
  std::vector<T*> ptrs(tensors.size());
  for (const auto i : c10::irange(tensors.size())) {
    ptrs[i] = getDataPointer<T>(tensors[i]);
  }
  return ptrs;
}

// For alltoall split size sanity check
inline void checkSplitSizes(
    const std::vector<int64_t>& split_sizes,
    const at::Tensor& tensor,
    int group_size) {
  if (split_sizes.empty()) {
    TORCH_CHECK(
        tensor.size(0) % group_size == 0,
        "Tensor's dim 0 does not divide equally across group size");
  } else {
    TORCH_CHECK(
        split_sizes.size() == static_cast<size_t>(group_size),
        "Number of tensor splits not equal to group size");
    const auto sum = c10::sum_integers(split_sizes);
    TORCH_CHECK(
        sum == tensor.size(0), "Split sizes doesn't match total dim 0 size");
  }
}

// Compute alltoall lengths and offsets, handling multi-dimension tensors
template <typename T>
size_t computeLengthsAndOffsets(
    const std::vector<int64_t>& split_sizes,
    const at::Tensor& tensor,
    std::vector<T>* lengths,
    std::vector<T>* offsets) {
  size_t group_size = lengths->size();
  bool equal_splits = false;
  size_t dim0_size = tensor.size(0);
  size_t row_size = (dim0_size ? tensor.numel() / dim0_size : 1);
  size_t split_size = 0;
  size_t offset = 0;

  if (split_sizes.empty()) {
    equal_splits = true;
    split_size = tensor.size(0) / group_size;
  }
  for (const auto i : c10::irange(group_size)) {
    size_t length = row_size * (equal_splits ? split_size : split_sizes[i]);
    (*lengths)[i] = length;
    (*offsets)[i] = offset;
    // TODO: see if we should add overflow protection for offset
    offset += length;
  }
  return offset;
}

template <typename T>
size_t computeLengthsAndOffsets(
    const std::vector<at::Tensor>& tensors,
    std::vector<T>* lengths,
    std::vector<T>* offsets) {
  size_t group_size = lengths->size();
  size_t offset = 0;
  for (const auto i : c10::irange(group_size)) {
    size_t length = tensors[i].numel();
    (*lengths)[i] = length;
    (*offsets)[i] = offset;
    offset += length;
  }
  return offset;
}

using RankType = uint32_t;
using SizeType = uint64_t;

// `errno` is only meaningful when it fails. E.g., a  successful `fork()` sets
// `errno` to `EINVAL` in child process on some macos
// (https://stackoverflow.com/a/20295079), and thus `errno` should really only
// be inspected if an error occurred.
//
// `success_cond` is an expression used to check if an error has happend. So for
// `fork()`, we can use `SYSCHECK(pid = fork(), pid != -1)`. The function output
// is stored in variable `__output` and may be used in `success_cond`.
#ifdef _WIN32
#define SYSCHECK(expr, success_cond)                                      \
  while (true) {                                                          \
    auto __output = (expr);                                               \
    auto errno_local = WSAGetLastError();                                 \
    (void)__output;                                                       \
    if (!(success_cond)) {                                                \
      if (errno == EINTR) {                                               \
        continue;                                                         \
      } else if (                                                         \
          errno_local == WSAETIMEDOUT || errno_local == WSAEWOULDBLOCK) { \
        TORCH_CHECK(false, "Socket Timeout");                             \
      } else {                                                            \
        throw std::system_error(errno_local, std::system_category());     \
      }                                                                   \
    } else {                                                              \
      break;                                                              \
    }                                                                     \
  }
#else
#define SYSCHECK(expr, success_cond)                            \
  while (true) {                                                \
    auto __output = (expr);                                     \
    (void)__output;                                             \
    if (!(success_cond)) {                                      \
      if (errno == EINTR) {                                     \
        continue;                                               \
      } else if (errno == EAGAIN || errno == EWOULDBLOCK) {     \
        TORCH_CHECK(false, "Socket Timeout");                   \
      } else {                                                  \
        throw std::system_error(errno, std::system_category()); \
      }                                                         \
    } else {                                                    \
      break;                                                    \
    }                                                           \
  }
#endif

// Most functions indicate error by returning `-1`. This is a helper macro for
// this common case with `SYSCHECK`.
// Since SOCKET_ERROR = -1 in MSVC, so also leverage SYSCHECK_ERR_RETURN_NEG1
#define SYSCHECK_ERR_RETURN_NEG1(expr) SYSCHECK(expr, __output != -1)

namespace tcputil {

// Send and receive
template <typename T>
void sendBytes(
    int socket,
    const T* buffer,
    size_t length,
    bool moreData = false) {
  size_t bytesToSend = sizeof(T) * length;
  if (bytesToSend == 0) {
    return;
  }

  auto bytes = reinterpret_cast<const uint8_t*>(buffer);
  uint8_t* currentBytes = const_cast<uint8_t*>(bytes);

  int flags = 0;

#ifdef MSG_MORE
  if (moreData) { // there is more data to send
    flags |= MSG_MORE;
  }
#endif

// Ignore SIGPIPE as the send() return value is always checked for error
#ifdef MSG_NOSIGNAL
  flags |= MSG_NOSIGNAL;
#endif

  while (bytesToSend > 0) {
    ssize_t bytesSent;
    SYSCHECK_ERR_RETURN_NEG1(
        bytesSent =
            ::send(socket, (const char*)currentBytes, bytesToSend, flags))
    if (bytesSent == 0) {
      throw std::system_error(ECONNRESET, std::system_category());
    }

    bytesToSend -= bytesSent;
    currentBytes += bytesSent;
  }
}

template <typename T>
void recvBytes(int socket, T* buffer, size_t length) {
  size_t bytesToReceive = sizeof(T) * length;
  if (bytesToReceive == 0) {
    return;
  }

  auto bytes = reinterpret_cast<uint8_t*>(buffer);
  uint8_t* currentBytes = bytes;

  while (bytesToReceive > 0) {
    ssize_t bytesReceived;
    SYSCHECK_ERR_RETURN_NEG1(
        bytesReceived = recv(socket, (char*)currentBytes, bytesToReceive, 0))
    if (bytesReceived == 0) {
      throw std::system_error(ECONNRESET, std::system_category());
    }

    bytesToReceive -= bytesReceived;
    currentBytes += bytesReceived;
  }
}

// send a vector's length and data
template <typename T>
void sendVector(int socket, const std::vector<T>& vec, bool moreData = false) {
  SizeType size = vec.size();
  sendBytes<SizeType>(socket, &size, 1, true);
  sendBytes<T>(socket, vec.data(), size, moreData);
}

// receive a vector as sent in sendVector
template <typename T>
std::vector<T> recvVector(int socket) {
  SizeType valueSize;
  recvBytes<SizeType>(socket, &valueSize, 1);
  std::vector<T> value(valueSize);
  recvBytes<T>(socket, value.data(), value.size());
  return value;
}

// this is only for convenience when sending rvalues
template <typename T>
void sendValue(int socket, const T& value, bool moreData = false) {
  sendBytes<T>(socket, &value, 1, moreData);
}

template <typename T>
T recvValue(int socket) {
  T value;
  recvBytes<T>(socket, &value, 1);
  return value;
}

// send a string's length and data
inline void sendString(
    int socket,
    const std::string& str,
    bool moreData = false) {
  SizeType size = str.size();
  sendBytes<SizeType>(socket, &size, 1, true);
  sendBytes<char>(socket, str.data(), size, moreData);
}

// receive a string as sent in sendString
inline std::string recvString(int socket) {
  SizeType valueSize;
  recvBytes<SizeType>(socket, &valueSize, 1);
  std::vector<char> value(valueSize);
  recvBytes<char>(socket, value.data(), value.size());
  return std::string(value.data(), value.size());
}

} // namespace tcputil
} // namespace c10d