From bd30c510bf5727b97671898a913c18672da7e371 Mon Sep 17 00:00:00 2001 From: Kathy Xu Date: Fri, 20 Jun 2025 11:22:36 -0700 Subject: [PATCH] add monitroing metrics for dram cache perf Summary: Add metrics breakdown to better understand DRAM cache perf - read perf breakdown - write perf breakdown: with a distinguisher between fwd `l1_eviction_write` and bwd `l1_cnflct_miss_write` - emb allocation and actual chunk allocated size Reviewed By: duduyi2013 Differential Revision: D76309945 --- fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py | 194 ++++++++++- .../SynchronizedShardedMap.h | 17 +- .../dram_kv_embedding_cache.h | 326 +++++++++++++++++- .../dram_kv_embedding_cache_wrapper.h | 10 +- .../fixed_block_pool.h | 14 +- .../kv_db_table_batched_embeddings.h | 14 +- .../ssd_split_table_batched_embeddings.cpp | 5 +- .../ssd_table_batched_embeddings.h | 2 +- .../sharded_map_test.cpp | 15 +- 9 files changed, 565 insertions(+), 32 deletions(-) diff --git a/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py b/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py index 1c58658f8f..4db51eff30 100644 --- a/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py +++ b/fbgemm_gpu/fbgemm_gpu/tbe/ssd/training.py @@ -915,6 +915,12 @@ def __init__( self.l2_cache_capacity_stats_name: str = ( f"l2_cache.mem.tbe_id{tbe_unique_id}.capacity_bytes" ) + self.dram_kv_actual_used_chunk_bytes_stats_name: str = ( + f"dram_kv.mem.tbe_id{tbe_unique_id}.actual_used_chunk_bytes" + ) + self.dram_kv_allocated_bytes_stats_name: str = ( + f"dram_kv.mem.tbe_id{tbe_unique_id}.allocated_bytes" + ) if self.stats_reporter: self.ssd_prefetch_read_timer = AsyncSeriesTimer( functools.partial( @@ -939,6 +945,10 @@ def __init__( self.stats_reporter.register_stats(self.l2_num_cache_evictions_stats_name) self.stats_reporter.register_stats(self.l2_cache_free_mem_stats_name) self.stats_reporter.register_stats(self.l2_cache_capacity_stats_name) + self.stats_reporter.register_stats(self.dram_kv_allocated_bytes_stats_name) + self.stats_reporter.register_stats( + self.dram_kv_actual_used_chunk_bytes_stats_name + ) self.bounds_check_version: int = get_bounds_check_version_for_platform() @@ -1890,7 +1900,7 @@ def _prefetch( # noqa C901 self.ssd_cache_stats = torch.add( self.ssd_cache_stats, self.local_ssd_cache_stats ) - self._report_ssd_stats() + self._report_kv_backend_stats() # Fetch data from SSD if linear_cache_indices.numel() > 0: @@ -2881,7 +2891,7 @@ def prepare_inputs( return indices, offsets, per_sample_weights, vbe_metadata @torch.jit.ignore - def _report_ssd_stats(self) -> None: + def _report_kv_backend_stats(self) -> None: """ All ssd stats report function entrance """ @@ -2896,6 +2906,8 @@ def _report_ssd_stats(self) -> None: self._report_ssd_io_stats() self._report_ssd_mem_usage() self._report_l2_cache_perf_stats() + if self.backend_type == BackendType.DRAM: + self._report_dram_kv_perf_stats() @torch.jit.ignore def _report_ssd_l1_cache_stats(self) -> None: @@ -3162,6 +3174,184 @@ def _report_l2_cache_perf_stats(self) -> None: time_unit="us", ) + @torch.jit.ignore + def _report_dram_kv_perf_stats(self) -> None: + """ + EmbeddingKVDB will hold stats for DRAM cache performance in fwd/bwd + this function fetch the stats from EmbeddingKVDB and report it with stats_reporter + """ + if self.stats_reporter is None: + return + + stats_reporter: TBEStatsReporter = self.stats_reporter + if not stats_reporter.should_report(self.step): + return + + dram_kv_perf_stats = self.ssd_db.get_dram_kv_perf( + self.step, stats_reporter.report_interval # pyre-ignore + ) + + if len(dram_kv_perf_stats) != 22: + logging.error("dram cache perf stats should have 22 elements") + return + + dram_read_duration = dram_kv_perf_stats[0] + dram_read_sharding_duration = dram_kv_perf_stats[1] + dram_read_cache_hit_copy_duration = dram_kv_perf_stats[2] + dram_read_fill_row_storage_duration = dram_kv_perf_stats[3] + dram_read_lookup_cache_duration = dram_kv_perf_stats[4] + dram_read_acquire_lock_duration = dram_kv_perf_stats[5] + dram_read_missing_load = dram_kv_perf_stats[6] + dram_write_sharing_duration = dram_kv_perf_stats[7] + + dram_fwd_l1_eviction_write_duration = dram_kv_perf_stats[8] + dram_fwd_l1_eviction_write_allocate_duration = dram_kv_perf_stats[9] + dram_fwd_l1_eviction_write_cache_copy_duration = dram_kv_perf_stats[10] + dram_fwd_l1_eviction_write_lookup_cache_duration = dram_kv_perf_stats[11] + dram_fwd_l1_eviction_write_acquire_lock_duration = dram_kv_perf_stats[12] + dram_fwd_l1_eviction_write_missing_load = dram_kv_perf_stats[13] + + dram_bwd_l1_cnflct_miss_write_duration = dram_kv_perf_stats[14] + dram_bwd_l1_cnflct_miss_write_allocate_duration = dram_kv_perf_stats[15] + dram_bwd_l1_cnflct_miss_write_cache_copy_duration = dram_kv_perf_stats[16] + dram_bwd_l1_cnflct_miss_write_lookup_cache_duration = dram_kv_perf_stats[17] + dram_bwd_l1_cnflct_miss_write_acquire_lock_duration = dram_kv_perf_stats[18] + dram_bwd_l1_cnflct_miss_write_missing_load = dram_kv_perf_stats[19] + + dram_kv_allocated_bytes = dram_kv_perf_stats[20] + dram_kv_actual_used_chunk_bytes = dram_kv_perf_stats[21] + + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_duration_us", + duration_ms=dram_read_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_sharding_duration_us", + duration_ms=dram_read_sharding_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_cache_hit_copy_duration_us", + duration_ms=dram_read_cache_hit_copy_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_fill_row_storage_duration_us", + duration_ms=dram_read_fill_row_storage_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_lookup_cache_duration_us", + duration_ms=dram_read_lookup_cache_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_acquire_lock_duration_us", + duration_ms=dram_read_acquire_lock_duration, + time_unit="us", + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name="dram_kv.perf.get.dram_read_missing_load", + data_bytes=dram_read_missing_load, + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_write_sharing_duration_us", + duration_ms=dram_write_sharing_duration, + time_unit="us", + ) + + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_duration_us", + duration_ms=dram_fwd_l1_eviction_write_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_allocate_duration_us", + duration_ms=dram_fwd_l1_eviction_write_allocate_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_cache_copy_duration_us", + duration_ms=dram_fwd_l1_eviction_write_cache_copy_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_lookup_cache_duration_us", + duration_ms=dram_fwd_l1_eviction_write_lookup_cache_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_acquire_lock_duration_us", + duration_ms=dram_fwd_l1_eviction_write_acquire_lock_duration, + time_unit="us", + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_fwd_l1_eviction_write_missing_load", + data_bytes=dram_fwd_l1_eviction_write_missing_load, + ) + + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_duration_us", + duration_ms=dram_bwd_l1_cnflct_miss_write_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_allocate_duration_us", + duration_ms=dram_bwd_l1_cnflct_miss_write_allocate_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_cache_copy_duration_us", + duration_ms=dram_bwd_l1_cnflct_miss_write_cache_copy_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_lookup_cache_duration_us", + duration_ms=dram_bwd_l1_cnflct_miss_write_lookup_cache_duration, + time_unit="us", + ) + stats_reporter.report_duration( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_acquire_lock_duration_us", + duration_ms=dram_bwd_l1_cnflct_miss_write_acquire_lock_duration, + time_unit="us", + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name="dram_kv.perf.set.dram_bwd_l1_cnflct_miss_write_missing_load", + data_bytes=dram_bwd_l1_cnflct_miss_write_missing_load, + ) + + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name=self.dram_kv_allocated_bytes_stats_name, + data_bytes=dram_kv_allocated_bytes, + ) + stats_reporter.report_data_amount( + iteration_step=self.step, + event_name=self.dram_kv_actual_used_chunk_bytes_stats_name, + data_bytes=dram_kv_actual_used_chunk_bytes, + ) + # pyre-ignore def _recording_to_timer( self, timer: Optional[AsyncSeriesTimer], **kwargs: Any diff --git a/fbgemm_gpu/src/dram_kv_embedding_cache/SynchronizedShardedMap.h b/fbgemm_gpu/src/dram_kv_embedding_cache/SynchronizedShardedMap.h index def6865da2..5a7b6ef63c 100644 --- a/fbgemm_gpu/src/dram_kv_embedding_cache/SynchronizedShardedMap.h +++ b/fbgemm_gpu/src/dram_kv_embedding_cache/SynchronizedShardedMap.h @@ -58,13 +58,24 @@ class SynchronizedShardedMap { return shards_.size(); } - auto getUsedMemSize() const { + auto getUsedMemSizeInBytes() const { size_t used_mem_size = 0; size_t block_size = mempools_[0]->get_aligned_block_size(); for (size_t i = 0; i < shards_.size(); ++i) { - auto rlmap = shards_[i].rlock(); + int64_t mempool_idx = i % mempools_.size(); // only calculate the sizes of K, V and block that are used - used_mem_size += rlmap->size() * (sizeof(K) + sizeof(V) + block_size); + if (mempools_[mempool_idx]->get_allocated_chunk_bytes() > 0) { + auto rlmap = shards_[i].rlock(); + used_mem_size += rlmap->size() * (sizeof(K) + sizeof(V) + block_size); + } + } + return used_mem_size; + } + + auto getActualUsedChunkInBytes() const { + size_t used_mem_size = 0; + for (size_t i = 0; i < mempools_.size(); ++i) { + used_mem_size += mempools_[i]->get_allocated_chunk_bytes(); } return used_mem_size; } diff --git a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h index 3788df8c7d..f1c5fc1bbf 100644 --- a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h +++ b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache.h @@ -22,6 +22,7 @@ #include #include #include +#include "common/time/Time.h" #include "../ssd_split_embeddings_cache/initializer.h" #include "../ssd_split_embeddings_cache/kv_db_table_batched_embeddings.h" @@ -241,22 +242,43 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { const at::Tensor& count, const kv_db::RocksdbWriteMode w_mode = kv_db::RocksdbWriteMode::FWD_ROCKSDB_READ /*unused*/) override { + auto start_ts = facebook::WallClockUtil::NowInUsecFast(); pause_ongoing_eviction(); - std::vector> futures; + std::vector< + folly::Future>> + futures; + auto before_shard_ts = facebook::WallClockUtil::NowInUsecFast(); auto shardid_to_indexes = shard_input(indices, count); + write_sharding_total_duration_ += + facebook::WallClockUtil::NowInUsecFast() - before_shard_ts; + for (auto iter = shardid_to_indexes.begin(); iter != shardid_to_indexes.end(); iter++) { const auto shard_id = iter->first; const auto indexes = iter->second; - auto f = + futures.emplace_back( folly::via(executor_.get()) .thenValue([this, shard_id, indexes, &indices, &weights]( folly::Unit) { + int64_t local_write_allocate_total_duration = 0; + int64_t local_write_cache_copy_total_duration = 0; + int64_t local_write_lookup_cache_total_duration = 0; + int64_t local_write_acquire_lock_duration = 0; + int64_t local_write_missing_load = 0; FBGEMM_DISPATCH_INTEGRAL_TYPES( indices.scalar_type(), "dram_kv_set", - [this, shard_id, indexes, &indices, &weights] { + [this, + shard_id, + indexes, + &indices, + &weights, + &local_write_allocate_total_duration, + &local_write_cache_copy_total_duration, + &local_write_lookup_cache_total_duration, + &local_write_acquire_lock_duration, + &local_write_missing_load] { using index_t = scalar_t; CHECK(indices.is_contiguous()); CHECK(weights.is_contiguous()); @@ -265,7 +287,12 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { auto indices_data_ptr = indices.data_ptr(); auto weights_data_ptr = weights.data_ptr(); { + auto before_write_lock_ts = + facebook::WallClockUtil::NowInUsecFast(); auto wlmap = kv_store_.by(shard_id).wlock(); + local_write_acquire_lock_duration = + facebook::WallClockUtil::NowInUsecFast() - + before_write_lock_ts; auto* pool = kv_store_.pool_by(shard_id); for (auto index_iter = indexes.begin(); index_iter != indexes.end(); @@ -275,14 +302,25 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { // use mempool weight_type* block = nullptr; // First check if the key already exists + auto before_lookup_cache_ts = + facebook::WallClockUtil::NowInUsecFast(); auto it = wlmap->find(id); + local_write_lookup_cache_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_lookup_cache_ts; if (it != wlmap->end()) { block = it->second; } else { // Key doesn't exist, allocate new block and insert. + local_write_missing_load++; + auto before_alloc_ts = + facebook::WallClockUtil::NowInUsecFast(); block = pool->template allocate_t(); FixedBlockPool::set_key(block, id); wlmap->insert({id, block}); + local_write_allocate_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_alloc_ts; } if (feature_evict_config_.has_value() && feature_evict_config_.value()->trigger_mode_ != @@ -290,19 +328,85 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { feature_evict_) { feature_evict_->update_feature_statistics(block); } + auto before_copy_ts = + facebook::WallClockUtil::NowInUsecFast(); auto* data_ptr = FixedBlockPool::data_ptr(block); std::copy( weights_data_ptr + id_index * stride, weights_data_ptr + (id_index + 1) * stride, data_ptr); + local_write_cache_copy_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_copy_ts; } } }); - }); - futures.push_back(std::move(f)); + return std::make_tuple( + local_write_allocate_total_duration, + local_write_cache_copy_total_duration, + local_write_lookup_cache_total_duration, + local_write_acquire_lock_duration, + local_write_missing_load); + })); } - return folly::collect(futures); + return folly::collect(std::move(futures)) + .via(executor_.get()) + .thenValue( + [this, start_ts, w_mode]( + const std::vector< + std::tuple>& + results) { + int64_t write_allocate_total_duration = 0; + int64_t write_cache_copy_total_duration = 0; + int64_t write_lookup_cache_total_duration = 0; + int64_t write_acquire_lock_total_duration = 0; + int64_t write_missing_load = 0; + for (const auto& tup : results) { + write_allocate_total_duration += std::get<0>(tup); + write_cache_copy_total_duration += std::get<1>(tup); + write_lookup_cache_total_duration += std::get<2>(tup); + write_acquire_lock_total_duration += std::get<3>(tup); + write_missing_load += std::get<4>(tup); + } + auto duration = + facebook::WallClockUtil::NowInUsecFast() - start_ts; + switch (w_mode) { + case kv_db::RocksdbWriteMode::BWD_L1_CNFLCT_MISS_WRITE_BACK: + bwd_l1_cnflct_miss_write_total_duration_ += duration; + bwd_l1_cnflct_miss_write_allocate_avg_duration_ += + write_allocate_total_duration / num_shards_; + bwd_l1_cnflct_miss_write_cache_copy_avg_duration_ += + write_cache_copy_total_duration / num_shards_; + bwd_l1_cnflct_miss_write_lookup_cache_avg_duration_ += + write_lookup_cache_total_duration / num_shards_; + bwd_l1_cnflct_miss_write_acquire_lock_avg_duration_ += + write_acquire_lock_total_duration / num_shards_; + bwd_l1_cnflct_miss_write_missing_load_avg_ += + write_missing_load / num_shards_; + break; + case kv_db::RocksdbWriteMode::FWD_L1_EVICTION: + fwd_l1_eviction_write_total_duration_ += duration; + fwd_l1_eviction_write_allocate_avg_duration_ += + write_allocate_total_duration / num_shards_; + fwd_l1_eviction_write_cache_copy_avg_duration_ += + write_cache_copy_total_duration / num_shards_; + fwd_l1_eviction_write_lookup_cache_avg_duration_ += + write_lookup_cache_total_duration / num_shards_; + fwd_l1_eviction_write_acquire_lock_avg_duration_ += + write_acquire_lock_total_duration / num_shards_; + fwd_l1_eviction_write_missing_load_avg_ += + write_missing_load / num_shards_; + break; + case kv_db::RocksdbWriteMode::FWD_ROCKSDB_READ: + break; + case kv_db::RocksdbWriteMode::FLUSH: + break; + case kv_db::RocksdbWriteMode::STREAM: + break; + } + return std::vector(results.size()); + }); } /// Get embeddings from kvstore. @@ -324,20 +428,26 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { std::optional width_length = std::nullopt) { // assuming get is called once each iteration and only by train // iteration(excluding state_dict) + auto start_ts = facebook::WallClockUtil::NowInUsecFast(); pause_ongoing_eviction(); // noop calls, no impact if called multiple times - std::vector> futures; + std::vector< + folly::Future>> + futures; auto row_width = weights.size(1); auto copy_width = width_length.value_or(row_width); CHECK_LE(row_width, max_D_); CHECK_EQ(copy_width, row_width); + auto before_shard_ts = facebook::WallClockUtil::NowInUsecFast(); auto shardid_to_indexes = shard_input(indices, count); + read_sharding_total_duration_ += + facebook::WallClockUtil::NowInUsecFast() - before_shard_ts; for (auto iter = shardid_to_indexes.begin(); iter != shardid_to_indexes.end(); iter++) { const auto shard_id = iter->first; const auto indexes = iter->second; - auto f = + futures.emplace_back( folly::via(executor_.get()) .thenValue([this, shard_id, @@ -346,6 +456,11 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { &weights, width_offset, row_width](folly::Unit) { + int64_t local_read_cache_hit_copy_total_duration = 0; + int64_t local_read_fill_row_storage_total_duration = 0; + int64_t local_read_lookup_cache_total_duration = 0; + int64_t local_read_aquire_lock_duration = 0; + int64_t local_read_missing_load = 0; FBGEMM_DISPATCH_INTEGRAL_TYPES( indices.scalar_type(), "dram_kvstore_set", @@ -355,7 +470,12 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { &indices, &weights, width_offset, - row_width] { + row_width, + &local_read_cache_hit_copy_total_duration, + &local_read_fill_row_storage_total_duration, + &local_read_lookup_cache_total_duration, + &local_read_aquire_lock_duration, + &local_read_missing_load] { using index_t = scalar_t; CHECK(indices.is_contiguous()); CHECK(weights.is_contiguous()); @@ -372,7 +492,12 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { ") types mismatch"); auto row_storage_data_ptr = init_storage.template data_ptr(); + auto before_read_lock_ts = + facebook::WallClockUtil::NowInUsecFast(); auto wlmap = kv_store_.by(shard_id).wlock(); + local_read_aquire_lock_duration = + facebook::WallClockUtil::NowInUsecFast() - + before_read_lock_ts; auto indices_data_ptr = indices.data_ptr(); { for (auto index_iter = indexes.begin(); @@ -383,10 +508,18 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { const auto weights_row_index = *index_iter; auto weight_idx = int64_t(indices_data_ptr[weights_row_index]); + auto before_lookup_cache_ts = + facebook::WallClockUtil::NowInUsecFast(); const auto cached_iter = wlmap->find(weight_idx); + local_read_lookup_cache_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_lookup_cache_ts; if (cached_iter == wlmap->end()) { + local_read_missing_load++; auto weight_width = get_width_for_weights( weight_idx, width_offset, row_width); + auto before_fill_from_row_storage_ts = + facebook::WallClockUtil::NowInUsecFast(); fill_from_row_storage( shard_id, reinterpret_cast( @@ -397,25 +530,71 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { width_offset, row_width, weight_width); + local_read_fill_row_storage_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_fill_from_row_storage_ts; continue; } // use mempool const auto* data_ptr = FixedBlockPool::data_ptr( cached_iter->second); + auto before_cache_hit_copy_ts = + facebook::WallClockUtil::NowInUsecFast(); std::copy( data_ptr + width_offset, data_ptr + width_offset + row_width, &(weights_data_ptr [weights_row_index * row_width])); // dst_start + local_read_cache_hit_copy_total_duration += + facebook::WallClockUtil::NowInUsecFast() - + before_cache_hit_copy_ts; } } }); - }); - futures.push_back(std::move(f)); + return std::make_tuple( + local_read_lookup_cache_total_duration, + local_read_fill_row_storage_total_duration, + local_read_cache_hit_copy_total_duration, + local_read_aquire_lock_duration, + local_read_missing_load); + })); } - return folly::collect(futures); + + return folly::collect(std::move(futures)) + .via(executor_.get()) + .thenValue( + [this, + start_ts](const std::vector< + std::tuple>& + results) { + int64_t read_lookup_cache_total_duration = 0; + int64_t read_fill_row_storage_total_duration = 0; + int64_t read_cache_hit_copy_total_duration = 0; + int64_t read_acquire_lock_total_duration = 0; + int64_t read_missing_load = 0; + for (const auto& tup : results) { + read_lookup_cache_total_duration += std::get<0>(tup); + read_fill_row_storage_total_duration += std::get<1>(tup); + read_cache_hit_copy_total_duration += std::get<2>(tup); + read_acquire_lock_total_duration += std::get<3>(tup); + read_missing_load += std::get<4>(tup); + } + auto duration = + facebook::WallClockUtil::NowInUsecFast() - start_ts; + read_total_duration_ += duration; + read_cache_hit_copy_avg_duration_ += + read_cache_hit_copy_total_duration / num_shards_; + read_fill_row_storage_avg_duration_ += + read_fill_row_storage_total_duration / num_shards_; + read_lookup_cache_total_avg_duration_ += + read_lookup_cache_total_duration / num_shards_; + read_acquire_lock_avg_duration_ += + read_acquire_lock_total_duration / num_shards_; + read_missing_load_avg_ += read_missing_load / num_shards_; + return std::vector(results.size()); + }); }; folly::SemiFuture> get_kv_db_async( @@ -496,7 +675,7 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { break; } case EvictTriggerMode::MEM_UTIL: { - auto mem_util = get_map_used_memsize() / (1024 * 1024 * 1024); + auto mem_util = get_map_used_memsize_in_bytes() / (1024 * 1024 * 1024); if (mem_util > feature_evict_config_.value()->mem_util_threshold_in_GB_.value()) { trigger_feature_evict(); @@ -515,8 +694,12 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { } } - size_t get_map_used_memsize() const override { - return kv_store_.getUsedMemSize(); + size_t get_map_used_memsize_in_bytes() const override { + return kv_store_.getUsedMemSizeInBytes(); + } + + size_t get_map_actual_used_chunk_in_bytes() const { + return kv_store_.getActualUsedChunkInBytes(); } std::optional get_feature_evict_metric() @@ -661,6 +844,92 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { } } + std::vector get_dram_kv_perf( + const int64_t step, + const int64_t interval) { + std::vector ret(22, 0); // num metrics + + allocated_memory_ = get_map_used_memsize_in_bytes(); + actual_used_chunk_memory_ = get_map_actual_used_chunk_in_bytes(); + if (step > 0 && step % interval == 0) { + int reset_val = 0; + + auto dram_read_total_duration = read_total_duration_.exchange(reset_val); + auto dram_read_sharding_total_duration = + read_sharding_total_duration_.exchange(reset_val); + auto dram_read_cache_hit_copy_duration = + read_cache_hit_copy_avg_duration_.exchange(reset_val); + auto dram_read_fill_row_storage_duration = + read_fill_row_storage_avg_duration_.exchange(reset_val); + auto dram_read_lookup_cache_duration = + read_lookup_cache_total_avg_duration_.exchange(reset_val); + auto dram_read_acquire_lock_duration = + read_acquire_lock_avg_duration_.exchange(reset_val); + auto dram_read_missing_load = read_missing_load_avg_.exchange(reset_val); + auto dram_write_sharding_total_duration = + write_sharding_total_duration_.exchange(reset_val); + + auto dram_fwd_l1_eviction_write_total_duration = + fwd_l1_eviction_write_total_duration_.exchange(reset_val); + auto dram_fwd_l1_eviction_write_allocate_duration = + fwd_l1_eviction_write_allocate_avg_duration_.exchange(reset_val); + auto dram_fwd_l1_eviction_write_cache_copy_duration = + fwd_l1_eviction_write_cache_copy_avg_duration_.exchange(reset_val); + auto dram_fwd_l1_eviction_write_lookup_cache_duration = + fwd_l1_eviction_write_lookup_cache_avg_duration_.exchange(reset_val); + auto dram_fwd_l1_eviction_write_acquire_lock_duration_ = + fwd_l1_eviction_write_acquire_lock_avg_duration_.exchange(reset_val); + auto dram_fwd_l1_eviction_write_missing_load_ = + fwd_l1_eviction_write_missing_load_avg_.exchange(reset_val); + + auto dram_bwd_l1_cnflct_miss_write_total_duration = + bwd_l1_cnflct_miss_write_total_duration_.exchange(reset_val); + auto dram_bwd_l1_cnflct_miss_write_allocate_duration = + bwd_l1_cnflct_miss_write_allocate_avg_duration_.exchange(reset_val); + auto dram_bwd_l1_cnflct_miss_write_cache_copy_duration = + bwd_l1_cnflct_miss_write_cache_copy_avg_duration_.exchange(reset_val); + auto dram_bwd_l1_cnflct_miss_write_lookup_cache_duration = + bwd_l1_cnflct_miss_write_lookup_cache_avg_duration_.exchange( + reset_val); + auto dram_bwd_l1_cnflct_miss_write_acquire_lock_duration_ = + bwd_l1_cnflct_miss_write_acquire_lock_avg_duration_.exchange( + reset_val); + auto dram_bwd_l1_cnflct_miss_write_missing_load_ = + bwd_l1_cnflct_miss_write_missing_load_avg_.exchange(reset_val); + + auto dram_allocated_memory = allocated_memory_.exchange(reset_val); + auto dram_actual_used_chunk_memory = + actual_used_chunk_memory_.exchange(reset_val); + + ret[0] = dram_read_total_duration / interval; + ret[1] = dram_read_sharding_total_duration / interval; + ret[2] = dram_read_cache_hit_copy_duration / interval; + ret[3] = dram_read_fill_row_storage_duration / interval; + ret[4] = dram_read_lookup_cache_duration / interval; + ret[5] = dram_read_acquire_lock_duration / interval; + ret[6] = dram_read_missing_load / interval; + ret[7] = dram_write_sharding_total_duration / interval; + + ret[8] = dram_fwd_l1_eviction_write_total_duration / interval; + ret[9] = dram_fwd_l1_eviction_write_allocate_duration / interval; + ret[10] = dram_fwd_l1_eviction_write_cache_copy_duration / interval; + ret[11] = dram_fwd_l1_eviction_write_lookup_cache_duration / interval; + ret[12] = dram_fwd_l1_eviction_write_acquire_lock_duration_ / interval; + ret[13] = dram_fwd_l1_eviction_write_missing_load_ / interval; + + ret[14] = dram_bwd_l1_cnflct_miss_write_total_duration / interval; + ret[15] = dram_bwd_l1_cnflct_miss_write_allocate_duration / interval; + ret[16] = dram_bwd_l1_cnflct_miss_write_cache_copy_duration / interval; + ret[17] = dram_bwd_l1_cnflct_miss_write_lookup_cache_duration / interval; + ret[18] = dram_bwd_l1_cnflct_miss_write_acquire_lock_duration_ / interval; + ret[19] = dram_bwd_l1_cnflct_miss_write_missing_load_ / interval; + + ret[20] = dram_allocated_memory / interval; + ret[21] = dram_actual_used_chunk_memory / interval; + } + return ret; + } + std::unique_ptr executor_; // background thread folly::FunctionScheduler scheduler_; @@ -678,6 +947,33 @@ class DramKVEmbeddingCache : public kv_db::EmbeddingKVDB { std::optional> feature_evict_config_; std::unique_ptr> feature_evict_; int current_iter_ = 0; + + // perf stats + std::atomic read_total_duration_{0}; + std::atomic read_sharding_total_duration_{0}; + std::atomic read_cache_hit_copy_avg_duration_{0}; + std::atomic read_fill_row_storage_avg_duration_{0}; + std::atomic read_lookup_cache_total_avg_duration_{0}; + std::atomic read_acquire_lock_avg_duration_{0}; + std::atomic read_missing_load_avg_{0}; + std::atomic write_sharding_total_duration_{0}; + + std::atomic bwd_l1_cnflct_miss_write_total_duration_{0}; + std::atomic bwd_l1_cnflct_miss_write_allocate_avg_duration_{0}; + std::atomic bwd_l1_cnflct_miss_write_cache_copy_avg_duration_{0}; + std::atomic bwd_l1_cnflct_miss_write_lookup_cache_avg_duration_{0}; + std::atomic bwd_l1_cnflct_miss_write_acquire_lock_avg_duration_{0}; + std::atomic bwd_l1_cnflct_miss_write_missing_load_avg_{0}; + + std::atomic fwd_l1_eviction_write_total_duration_{0}; + std::atomic fwd_l1_eviction_write_allocate_avg_duration_{0}; + std::atomic fwd_l1_eviction_write_cache_copy_avg_duration_{0}; + std::atomic fwd_l1_eviction_write_lookup_cache_avg_duration_{0}; + std::atomic fwd_l1_eviction_write_acquire_lock_avg_duration_{0}; + std::atomic fwd_l1_eviction_write_missing_load_avg_{0}; + + std::atomic allocated_memory_{0}; + std::atomic actual_used_chunk_memory_{0}; }; // class DramKVEmbeddingCache } // namespace kv_mem diff --git a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache_wrapper.h b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache_wrapper.h index 851002664e..77d1832a87 100644 --- a/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache_wrapper.h +++ b/fbgemm_gpu/src/dram_kv_embedding_cache/dram_kv_embedding_cache_wrapper.h @@ -121,8 +121,14 @@ class DramKVEmbeddingCacheWrapper : public torch::jit::CustomClassHolder { return impl_->get_keys_in_range_impl(start, end, std::nullopt); } - size_t get_map_used_memsize() const { - return impl_->get_map_used_memsize(); + size_t get_map_used_memsize_in_bytes() const { + return impl_->get_map_used_memsize_in_bytes(); + } + + std::vector get_dram_kv_perf( + const int64_t step, + const int64_t interval) { + return impl_->get_dram_kv_perf(step, interval); } void get_feature_evict_metric( diff --git a/fbgemm_gpu/src/dram_kv_embedding_cache/fixed_block_pool.h b/fbgemm_gpu/src/dram_kv_embedding_cache/fixed_block_pool.h index b67fe58b9e..727de1ed01 100644 --- a/fbgemm_gpu/src/dram_kv_embedding_cache/fixed_block_pool.h +++ b/fbgemm_gpu/src/dram_kv_embedding_cache/fixed_block_pool.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include #include @@ -161,6 +162,7 @@ class FixedBlockPool : public std::pmr::memory_resource { // Release all allocated memory during destruction ~FixedBlockPool() override { + std::lock_guard guard(chunks_mutex_); for (auto&& chunk : chunks_) { upstream_->deallocate(chunk.ptr, chunk.size, chunk.alignment); } @@ -181,6 +183,7 @@ class FixedBlockPool : public std::pmr::memory_resource { template scalar_t* get_block(size_t index) { + std::lock_guard guard(chunks_mutex_); char* current_chunk = static_cast(chunks_[index / blocks_per_chunk_].ptr); char* block = current_chunk + block_size_ * (index % blocks_per_chunk_); @@ -208,6 +211,11 @@ class FixedBlockPool : public std::pmr::memory_resource { block_alignment_; } + [[nodiscard]] std::size_t get_allocated_chunk_bytes() const noexcept { + std::lock_guard guard(chunks_mutex_); + return chunks_.empty() ? 0 : chunks_.size() * chunks_[0].size; + } + protected: // Core allocation function void* do_allocate(std::size_t bytes, std::size_t alignment) override { @@ -254,7 +262,10 @@ class FixedBlockPool : public std::pmr::memory_resource { void* chunk_ptr = upstream_->allocate(chunk_size, block_alignment_); // Record chunk information for later release - chunks_.push_back({chunk_ptr, chunk_size, block_alignment_}); + { + std::lock_guard guard(chunks_mutex_); + chunks_.push_back({chunk_ptr, chunk_size, block_alignment_}); + } // Initialize free list: link blocks in reverse order from chunk end to // beginning (improves locality) @@ -274,5 +285,6 @@ class FixedBlockPool : public std::pmr::memory_resource { std::pmr::memory_resource* upstream_; // Upstream memory resource std::pmr::vector chunks_; // Records of all allocated chunks void* free_list_ = nullptr; // Free block list head pointer + mutable std::mutex chunks_mutex_; // Mutex for chunks_ }; } // namespace kv_mem diff --git a/fbgemm_gpu/src/ssd_split_embeddings_cache/kv_db_table_batched_embeddings.h b/fbgemm_gpu/src/ssd_split_embeddings_cache/kv_db_table_batched_embeddings.h index b6c0f1f7c6..932bca8ad0 100644 --- a/fbgemm_gpu/src/ssd_split_embeddings_cache/kv_db_table_batched_embeddings.h +++ b/fbgemm_gpu/src/ssd_split_embeddings_cache/kv_db_table_batched_embeddings.h @@ -290,7 +290,7 @@ class EmbeddingKVDB : public std::enable_shared_from_this { * * @return Size of memory used by the map in bytes. */ - virtual size_t get_map_used_memsize() const { + virtual size_t get_map_used_memsize_in_bytes() const { FBEXCEPTION("Not implemented"); }; @@ -345,7 +345,15 @@ class EmbeddingKVDB : public std::enable_shared_from_this { FBEXCEPTION("Not implemented"); } - void set_range_to_storage( + virtual std::vector get_dram_kv_perf( + const int64_t step, + const int64_t interval) { + (void)step; + (void)interval; + FBEXCEPTION("Not implemented"); + } + + virtual void set_range_to_storage( const at::Tensor& weights, const int64_t start, const int64_t length) { @@ -373,7 +381,7 @@ class EmbeddingKVDB : public std::enable_shared_from_this { void set_kv_to_storage(const at::Tensor& ids, const at::Tensor& weights) { const auto count = at::tensor({ids.size(0)}, at::ScalarType::Long); - folly::coro::blockingWait(set_kv_db_async(ids, weights, count)); + set_kv_db_async(ids, weights, count).wait(); } virtual void get_kv_from_storage_by_snapshot( diff --git a/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_split_table_batched_embeddings.cpp b/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_split_table_batched_embeddings.cpp index 2daf874897..b04bc3448c 100644 --- a/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_split_table_batched_embeddings.cpp +++ b/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_split_table_batched_embeddings.cpp @@ -911,7 +911,10 @@ static auto dram_kv_embedding_cache_wrapper = &DramKVEmbeddingCacheWrapper::get_keys_in_range_by_snapshot) .def( "get_feature_evict_metric", - &DramKVEmbeddingCacheWrapper::get_feature_evict_metric); + &DramKVEmbeddingCacheWrapper::get_feature_evict_metric) + .def( + "get_dram_kv_perf", + &DramKVEmbeddingCacheWrapper::get_dram_kv_perf); static auto embedding_rocks_db_read_only_wrapper = torch::class_("fbgemm", "ReadOnlyEmbeddingKVDB") .def( diff --git a/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h b/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h index b8daacc331..f09caeb2ca 100644 --- a/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h +++ b/fbgemm_gpu/src/ssd_split_embeddings_cache/ssd_table_batched_embeddings.h @@ -721,7 +721,7 @@ class EmbeddingRocksDB : public kv_db::EmbeddingKVDB { void set_kv_to_storage(const at::Tensor& ids, const at::Tensor& weights) { const auto count = at::tensor({ids.size(0)}, at::ScalarType::Long); - folly::coro::blockingWait(set_kv_db_async(ids, weights, count)); + set_kv_db_async(ids, weights, count).wait(); } void get_kv_from_storage_by_snapshot( diff --git a/fbgemm_gpu/test/dram_kv_embedding_cache/sharded_map_test.cpp b/fbgemm_gpu/test/dram_kv_embedding_cache/sharded_map_test.cpp index a7553d6f60..60930ed49b 100644 --- a/fbgemm_gpu/test/dram_kv_embedding_cache/sharded_map_test.cpp +++ b/fbgemm_gpu/test/dram_kv_embedding_cache/sharded_map_test.cpp @@ -164,12 +164,14 @@ void memPoolEmbeddingMemSize(int dimension, size_t numInserts) { wlock->insert_or_assign(i, block); } } - size_t totalMemory = embeddingMap.getUsedMemSize(); + size_t totalMemory = embeddingMap.getUsedMemSizeInBytes(); + size_t actualUsedChunkInBytes = embeddingMap.getActualUsedChunkInBytes(); fmt::print( - "{:<20}{:<20}{:<20.2f}\n", + "{:<20}{:<20}{:<20.2f}{:<20.2f}\n", dimension, numInserts, - static_cast(totalMemory) / (1024 * 1024)); // MB + static_cast(totalMemory) / (1024 * 1024), + static_cast(actualUsedChunkInBytes) / (1024 * 1024)); // MB } int benchmark() { @@ -212,7 +214,12 @@ int benchmark() { fmt::print( "======================= memory usage statistics " "====================================\n"); - fmt::print("{:<20}{:<20}{:<20}\n", "dim", "numInserts", "total memory (MB)"); + fmt::print( + "{:<20}{:<20}{:<20}{:<20}\n", + "dim", + "numInserts", + "total memory (MB)", + "actual used chunk (MB))"); for (int dim : dimensions) { memPoolEmbeddingMemSize(dim, numInserts); }