Repository URL to install this package:
|
Version:
2.4.0 ▾
|
#include <c10/core/Allocator.h>
#include <c10/util/Optional.h>
#include <c10/util/flat_hash_map.h>
#include <c10/util/llvmMathExtras.h>
#include <deque>
#include <mutex>
#include <set>
namespace at {
/**
* HostBlock is typically a fundamental memory block used in pinned memory. It
* is likely related to Event and Stream of device runtime. It is probably a
* base struct or interface that can be inherited and extended by each backend.
*/
template <typename S>
struct HostBlock {
// constructor for search key
HostBlock(size_t size) : size_(size) {}
HostBlock(size_t size, void* ptr) : size_(size), ptr_(ptr) {}
std::mutex mutex_;
size_t size_{0}; // block size in bytes
void* ptr_{nullptr}; // memory address
bool allocated_{false}; // in-use flag
size_t event_count_{0}; // number of related events
ska::flat_hash_set<S> streams_; // streams on which the block was used
};
/**
* ComparatorSize is used for lookup support in the set of host memory blocks
* using the block size.
*/
template <typename B>
struct ComparatorSize {
bool operator()(const B* a, const B* b) const {
if (a->size_ != b->size_) {
return a->size_ < b->size_;
}
return (uintptr_t)a->ptr_ < (uintptr_t)b->ptr_;
}
};
/**
* Note [HostAllocator design]
* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
* We have three key data structures - the free list which stores blocks that
* are not currently used, the block list which stores all blocks that have been
* allocated, and the event queue which stores runtime events and their
* corresponding blocks.
*
* Each of these are protected by a separate mutex. The key design principles
* are to 1) only hold each mutex for the minimal amount of time possible, 2)
* never do any possible expensive operations (such as CUDA runtime API calls)
* while holding the lock.
*
* There are four public methods: allocate, free, record_event and empty_cache.
* 1) In the allocate path, we first check to see if we can service our
* request from this free list, and otherwise we create a new block with
* allocate_host_memory.
* 2) In the free path, we insert events (if required) into the event queue,
* and if possible insert our block back into the free list. In allocate, we
* first eagerly query events until we find one that is not ready, and insert
* the corresponding block onto the free list if all the events recorded for a
* block are ready.
* 3) In the record_event path, we simply insert the given stream into the set
* of streams tracked by the specified block. This set of streams is then
* consumed in the free path.
* 4) In the empty_cache path, we flush any available blocks into the free
* list. Remove all element of free list, then remove them from block list and
* release the associated pinned memory allocation via free_block.
*
* We generalize the caching host allocator into two parts: interface and
* implementation. For any new backend looking to integrate with host allocator
* and reuse caching mechanism, these two parts are necessary to be specialized.
*
* For the implementation, we provide a CachingHostAllocatorImpl struct
* to abstract the caching mechanism. Any backend needs to provide a customized
* implementation by specializing its own public functions and the related
* runtime functions. Its template parameter S represents runtime Stream, E
* denotes runtime Event, B indicates the fundamental memory block, and C
* signifies the sorting compartor algorithm for the memory blocks.
*
* For the interface, we provide a CachingHostAllocatorInterface struct as an
* interface. Any backend needs to derive its own host allocator from this
* interface. Its template parameter T refers to an implementation that
* inherited from CachingHostAllocatorImpl.
*
* So this design can share the caching mechanism across each backend, and
* provide flexibility to each backend. A backend can choose to follow this
* implementation or reuse them by extending and overriding them as necessary.
* Taking CUDA as an example, it specializes runtime related functions to reuse
* the caching mechanism. Additionally, it extends the allocator's functionality
* by adding the allocWithCudaHostRegister function to support page-locking the
* memory range used by CUDA. Of course, you can also refer to
* XPUCachingHostAllocator, which is a host caching allocator supported on XPU
* backend, to implement a basic host caching allocator.
*
* Some of the invariants here are less strict than they could be - for example,
* we do not enforce that free(Block* block) => block->event_count == 0. This is
* for compatibility reasons, and we can explore enforcing these in subsequent
* versions.
*
* Note that this caching host allocator does not split larger allocations into
* smaller blocks, unlike the caching device allocator.
*/
template <
typename S,
typename E,
typename B = HostBlock<S>,
typename C = ComparatorSize<B>>
struct CachingHostAllocatorImpl {
virtual ~CachingHostAllocatorImpl() = default;
public:
// return data_ptr and block pair.
virtual std::pair<void*, void*> allocate(size_t size) {
if (size == 0) {
return {nullptr, nullptr};
}
process_events();
// First, try to allocate from the free list
auto* block = get_free_block(size);
if (block) {
return {block->ptr_, reinterpret_cast<void*>(block)};
}
// Round up the allocation to the nearest power of two to improve reuse.
size_t roundSize = c10::llvm::PowerOf2Ceil(size);
void* ptr = nullptr;
allocate_host_memory(roundSize, &ptr);
// Then, create a new block.
block = new B(roundSize, ptr);
block->allocated_ = true;
add_allocated_block(block);
return {block->ptr_, reinterpret_cast<void*>(block)};
}
virtual void free(void* ctx) {
if (!ctx) {
return;
}
// Note: we can assume that free is correctly paired with alloc, and thus we
// do not need to look up the ctx in blocks_.
auto* block = reinterpret_cast<B*>(ctx);
std::optional<std::vector<E>> events;
{
std::lock_guard<std::mutex> g(block->mutex_);
block->allocated_ = false;
if (block->streams_.empty()) {
TORCH_INTERNAL_ASSERT(block->event_count_ == 0);
} else {
events = std::vector<E>();
events->reserve(block->streams_.size());
for (auto stream : block->streams_) {
record_stream(events, stream);
}
block->event_count_ += events->size();
block->streams_.clear();
}
}
if (!events) {
std::lock_guard<std::mutex> g(free_list_mutex_);
free_list_.insert(block);
} else {
// restore these events that record by used streams.
std::lock_guard<std::mutex> g(events_mutex_);
for (auto&& event : *events) {
events_.emplace_front(std::move(event), block);
}
}
}
virtual bool record_event(void* ptr, void* ctx, S stream) {
auto* block = reinterpret_cast<B*>(ctx);
// Note: we need to check if the passed-in `ctx` is valid. This is because
// `record_event` (via `CachingHostAllocator_recordEvent`) can be invoked on
// an arbitrary tensor, and is not guaranteed to correspond to a pinned
// memory allocation. Therefore, we need to check that `ctx` is valid before
// proceeding.
{
std::lock_guard<std::mutex> g(blocks_mutex_);
if (blocks_.find(block) != blocks_.end()) {
// Now we know this object is safe to access.
std::lock_guard<std::mutex> gb(block->mutex_);
TORCH_INTERNAL_ASSERT(block->allocated_);
block->streams_.insert(stream);
return true;
}
auto it = ptr_to_block_.find(ptr);
if (it != ptr_to_block_.end()) {
block = it->second;
std::lock_guard<std::mutex> g(block->mutex_);
TORCH_INTERNAL_ASSERT(block->allocated_);
block->streams_.insert(stream);
return true;
}
}
return false;
}
virtual void empty_cache() {
// Flush any available blocks into the free_list.
process_events();
// Remove all elements from the free list, remove them from the blocks
// list, and free the associated pinned memory allocation. This requires
// concurrently holding both the free list mutex and the blocks mutex, and
// is the only function that concurrently holds multiple mutexes.
std::lock(free_list_mutex_, blocks_mutex_);
std::lock_guard<std::mutex> gf(free_list_mutex_, std::adopt_lock);
std::lock_guard<std::mutex> gb(blocks_mutex_, std::adopt_lock);
std::vector<B*> blocks_to_remove(free_list_.begin(), free_list_.end());
free_list_.clear();
for (auto* block : blocks_to_remove) {
blocks_.erase(block);
ptr_to_block_.erase(block->ptr_);
free_block(block);
delete block;
}
}
virtual void copy_data(void* dest, const void* src, std::size_t count) const {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for copy_data");
}
private:
virtual void add_allocated_block(B* block) {
std::lock_guard<std::mutex> g(blocks_mutex_);
blocks_.insert(block);
ptr_to_block_.insert({block->ptr_, block});
}
virtual B* get_free_block(size_t size) {
std::lock_guard<std::mutex> g(free_list_mutex_);
B key(size);
auto it = free_list_.lower_bound(&key);
if (it != free_list_.end()) {
B* block = *it;
block->allocated_ = true;
free_list_.erase(it);
return block;
}
return nullptr;
}
virtual void process_events() {
while (true) {
// Avoid calling cudaEventDestroy while holding a mutex, so move
// intermediate events out of the lock into this object.
// process the last event
std::optional<std::pair<E, B*>> processed;
{
std::lock_guard<std::mutex> g(events_mutex_);
if (!events_.empty()) {
processed = std::move(events_.back());
events_.pop_back();
}
}
if (!processed) {
return;
}
// otherwise, query the event
{
// now, see if we can handle this element
auto& event = processed->first;
if (!query_event(event)) {
// push the event onto the back if it's not ready.
{
std::lock_guard<std::mutex> g(events_mutex_);
events_.push_back(std::move(*processed));
}
return;
}
}
// Process the events.
TORCH_INTERNAL_ASSERT(processed);
auto* block = processed->second;
bool available = false;
{
std::lock_guard<std::mutex> g(block->mutex_);
TORCH_INTERNAL_ASSERT(!block->allocated_)
block->event_count_--;
if (block->event_count_ == 0) {
available = true;
}
}
if (available) {
std::lock_guard<std::mutex> g(free_list_mutex_);
free_list_.insert(block);
}
}
}
/* These following functions are runtime-related. */
// Allocate page-locked memory on the host.
virtual void allocate_host_memory(size_t size, void** ptr) {
TORCH_CHECK_NOT_IMPLEMENTED(
false, "Not implemented for allocate_host_memory");
}
// Free block and release the pointer contained in block.
virtual void free_block(B* block) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for free_block");
}
// Record an event on stream and store event into events.
virtual void record_stream(std::optional<std::vector<E>>& events, S stream) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for record_stream");
}
// Query event if it is completed.
virtual bool query_event(E& event) {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for query_event");
}
alignas(64) std::mutex blocks_mutex_;
ska::flat_hash_set<B*> blocks_; // block list
ska::flat_hash_map<void*, B*> ptr_to_block_;
// Note: sharding this mutex seems to be profitable in heavily multi-threaded
// scenarios.
alignas(64) std::mutex free_list_mutex_;
// Note: an alternative datastructure can yield significant wins here in
// microbenchmarks.
std::set<B*, C> free_list_; // free list
alignas(64) std::mutex events_mutex_;
std::deque<std::pair<E, B*>> events_; // event queue paired with block
};
template <typename T>
struct CachingHostAllocatorInterface : public at::Allocator {
CachingHostAllocatorInterface() :impl_(std::make_unique<T>()) {}
at::DataPtr allocate(size_t size) override {
TORCH_CHECK_NOT_IMPLEMENTED(false, "Not implemented for allocate");
}
void free(void* ctx) {
impl_->free(ctx);
}
template <typename S>
bool record_event(void* ptr, void* ctx, S stream) {
return impl_->record_event(ptr, ctx, stream);
}
void empty_cache() {
impl_->empty_cache();
}
void copy_data(void* dest, const void* src, std::size_t count)
const override {
impl_->copy_data(dest, src, count);
}
std::unique_ptr<T> impl_;
};
} // namespace at