Why Gemfury? Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Debian packages RPM packages NuGet packages

Repository URL to install this package:

Details    
torch / include / c10 / xpu / XPUStream.h
Size: Mime:
#pragma once

#include <c10/core/Stream.h>
#include <c10/core/impl/GPUTrace.h>
#include <c10/xpu/XPUFunctions.h>

namespace c10::xpu {

/*
 * Note [Stream Management]
 *
 * An XPUStream is an abstraction of an actual SYCL queue in which SYCL kernel
 * can execute. Currently, there are several pools per device to manage SYCL
 * queue, and a device's pool is lazily created.
 *
 * There are two pools per device. The first pool contains "normal priority"
 * queues. The second pool is the "high priority" queues. There are 32 queues in
 * per pool per device, and when a queue is requested one of these queues is
 * returned round-robin. That is, the first queue requested is at index 0, the
 * second at index 1... to index 31, then index 0 again.
 *
 * This means that if 33 queues are requested, the first and last queues
 * requested are actually the same queue (under the covers) and kernels enqueued
 * on them cannot run concurrently.
 *
 * It is safe to enqueue a kernel on the same queue from two different
 * threads as the SYCL specification described.
 */

static constexpr int max_compile_time_stream_priorities = 2;

/*
 * This serves as a wrapper around c10::Stream and acts as a representation for
 * a SYCL queue, which allows asynchronous execution of XPU tasks.
 */
class C10_XPU_API XPUStream {
 public:
  enum Unchecked { UNCHECKED };

  /// Construct a XPUStream from a Stream. This construction is checked, and
  /// will raise an error if the Stream is not, in fact, a XPU stream.
  explicit XPUStream(Stream stream) : stream_(stream) {
    TORCH_CHECK(stream_.device_type() == DeviceType::XPU);
  }

  /// Construct a XPUStream from a Stream with no error checking.
  explicit XPUStream(Unchecked, Stream stream) : stream_(stream) {}

  bool operator==(const XPUStream& other) const noexcept {
    return unwrap() == other.unwrap();
  }

  bool operator!=(const XPUStream& other) const noexcept {
    return unwrap() != other.unwrap();
  }

  /// Implicit conversion to sycl::queue&.
  operator sycl::queue&() const {
    return queue();
  }

  /// Implicit conversion to Stream (a.k.a., forget that the stream is a
  /// XPU stream).
  operator Stream() const {
    return unwrap();
  }

  /// Get the XPU device type that this stream is associated with.
  DeviceType device_type() const {
    return DeviceType::XPU;
  }

  /// Get the XPU device index that this stream is associated with.
  DeviceIndex device_index() const {
    return stream_.device_index();
  }

  /// Get the full Device that this stream is associated with. The Device is
  /// guaranteed to be a XPU device.
  Device device() const {
    return Device(DeviceType::XPU, device_index());
  }

  /// Return the stream ID corresponding to this particular stream. StreamId is
  /// a int64_t representation generated by its type and index.
  StreamId id() const {
    return stream_.id();
  }

  /// Return true if all enqueued tasks in this stream have been completed,
  /// otherwise return false.
  bool query() const {
    return queue().ext_oneapi_empty();
  }

  /// Performs a blocking wait for the completion of all enqueued tasks in this
  /// stream.
  void synchronize() const {
    queue().wait_and_throw();
    const c10::impl::PyInterpreter* interp = c10::impl::GPUTrace::get_trace();
    if (C10_UNLIKELY(interp)) {
      (*interp)->trace_gpu_stream_synchronization(
          c10::kXPU, reinterpret_cast<uintptr_t>(&queue()));
    }
  }

  /// Return the priority that this stream is associated with. Lower numbers
  /// represent higher priority.
  int priority() const;

  /// Explicit conversion to sycl::queue&.
  sycl::queue& queue() const;

  /// Explicit conversion to Stream.
  Stream unwrap() const {
    return stream_;
  }

  /// Reversibly pack a XPUStream into a struct representation. The XPUStream
  /// can be unpacked using unpack3().
  struct c10::StreamData3 pack3() const {
    return stream_.pack3();
  }

  /// Unpack a XPUStream from the 3 fields generated by pack3().
  static XPUStream unpack3(
      StreamId stream_id,
      DeviceIndex device_index,
      DeviceType device_type) {
    return XPUStream(Stream::unpack3(stream_id, device_index, device_type));
  }

  /// Return the range of priority **supported by PyTorch**.
  static std::tuple<int, int> priority_range() {
    return std::make_tuple(0, -max_compile_time_stream_priorities + 1);
  }

 private:
  Stream stream_;
};

/**
 * Get a stream from the pool in a round-robin fashion.
 *
 * You can request a stream from the highest priority pool by setting
 * isHighPriority to true for a specific device.
 */
C10_XPU_API XPUStream
getStreamFromPool(const bool isHighPriority = false, DeviceIndex device = -1);

/**
 * Get a stream from the pool in a round-robin fashion.
 *
 * You can request a stream by setting a priority value for a specific device.
 * The priority number lower, the priority higher.
 */
C10_XPU_API XPUStream
getStreamFromPool(const int priority, DeviceIndex device = -1);

/**
 * Get the current XPU stream, for the passed XPU device, or for the current
 * device if no device index is passed.
 */
C10_XPU_API XPUStream getCurrentXPUStream(DeviceIndex device = -1);

/**
 * Set the current stream on the device of the passed in stream to be the passed
 * in stream.
 */
C10_XPU_API void setCurrentXPUStream(XPUStream stream);

C10_XPU_API std::ostream& operator<<(std::ostream& stream, const XPUStream& s);

/**
 * Block all reserved SYCL queues in the stream pools on the device, and wait
 * for their synchronizations.
 */
C10_XPU_API void syncStreamsOnDevice(DeviceIndex device = -1);

} // namespace c10::xpu

namespace std {
template <>
struct hash<c10::xpu::XPUStream> {
  size_t operator()(c10::xpu::XPUStream s) const noexcept {
    return std::hash<c10::Stream>{}(s.unwrap());
  }
};
} // namespace std