diff --git a/ydb/core/base/memory_controller_iface.h b/ydb/core/base/memory_controller_iface.h index 672920ecdda7..c6f1b43ed05e 100644 --- a/ydb/core/base/memory_controller_iface.h +++ b/ydb/core/base/memory_controller_iface.h @@ -7,6 +7,10 @@ namespace NKikimr::NMemory { enum class EMemoryConsumerKind { SharedCache, MemTable, + ScanGroupedMemoryLimiter, + CompGroupedMemoryLimiter, + BlobCache, + DataAccessorCache }; struct IMemoryConsumer : public TThrRefBase { @@ -17,7 +21,7 @@ enum EEvMemory { EvConsumerRegister = EventSpaceBegin(TKikimrEvents::ES_MEMORY), EvConsumerRegistered, EvConsumerLimit, - + EvMemTableRegister, EvMemTableRegistered, EvMemTableCompact, @@ -47,10 +51,12 @@ struct TEvConsumerRegistered : public TEventLocal { ui64 LimitBytes; + std::optional HardLimitBytes; - TEvConsumerLimit(ui64 limitBytes) + TEvConsumerLimit(ui64 limitBytes, std::optional hardLimitBytes = std::nullopt) : LimitBytes(limitBytes) - {} + , HardLimitBytes(std::move(hardLimitBytes)) { + } }; struct TEvMemTableRegister : public TEventLocal { diff --git a/ydb/core/base/ut/memory_stats_ut.cpp b/ydb/core/base/ut/memory_stats_ut.cpp index a8aaa34e81f1..21f172719110 100644 --- a/ydb/core/base/ut/memory_stats_ut.cpp +++ b/ydb/core/base/ut/memory_stats_ut.cpp @@ -69,7 +69,7 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { aggregator.Add(stats1, "host1"); aggregator.Add(stats2, "host2"); aggregator.Add(stats3, "host3"); - + TMemoryStats aggregated = aggregator.Aggregate(); Cerr << aggregated.ShortDebugString() << Endl; @@ -96,7 +96,7 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { aggregator.Add(stats1, "host1"); aggregator.Add(stats2, "host2"); aggregator.Add(stats3, "host3"); - + TMemoryStats aggregated = aggregator.Aggregate(); Cerr << aggregated.ShortDebugString() << Endl; @@ -119,7 +119,7 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { aggregator.Add(stats1, "host"); aggregator.Add(stats2, "host"); aggregator.Add(stats3, "host"); - + TMemoryStats aggregated = aggregator.Aggregate(); Cerr << aggregated.ShortDebugString() << Endl; @@ -146,7 +146,7 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { aggregator.Add(stats1, "host"); aggregator.Add(stats2, "host"); aggregator.Add(stats3, "host"); - + TMemoryStats aggregated = aggregator.Aggregate(); Cerr << aggregated.ShortDebugString() << Endl; @@ -169,7 +169,7 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { aggregator.Add(stats1, "host1"); aggregator.Add(stats2, "host1"); aggregator.Add(stats3, "host2"); - + TMemoryStats aggregated = aggregator.Aggregate(); Cerr << aggregated.ShortDebugString() << Endl; @@ -177,6 +177,24 @@ Y_UNIT_TEST_SUITE (TMemoryStatsAggregator) { UNIT_ASSERT_VALUES_EQUAL(aggregated.ShortDebugString(), "AnonRss: 36 CGroupLimit: 66 MemTotal: 65 MemAvailable: 85 AllocatedMemory: 156 AllocatorCachesMemory: 186 HardLimit: 145 SoftLimit: 165 TargetUtilization: 185 ExternalConsumption: 194 SharedCacheConsumption: 336 SharedCacheLimit: 366 MemTableConsumption: 396 MemTableLimit: 426 QueryExecutionConsumption: 456 QueryExecutionLimit: 486"); } + + Y_UNIT_TEST(ColumnShard_Single) { + TMemoryStatsAggregator aggregator; + + TMemoryStats stats; + stats.SetColumnTablesReadExecutionConsumption(1); + stats.SetColumnTablesReadExecutionLimit(2); + stats.SetColumnTablesCompactionConsumption(3); + stats.SetColumnTablesCompactionLimit(4); + stats.SetColumnTablesCacheConsumption(5); + stats.SetColumnTablesCacheLimit(6); + + aggregator.Add(stats, "host"); + + TMemoryStats aggregated = aggregator.Aggregate(); + + UNIT_ASSERT_VALUES_EQUAL(aggregated.ShortDebugString(), stats.ShortDebugString()); + } } } diff --git a/ydb/core/memory_controller/memory_controller.cpp b/ydb/core/memory_controller/memory_controller.cpp index 0b5b4f8553a4..47b09fce30ba 100644 --- a/ydb/core/memory_controller/memory_controller.cpp +++ b/ydb/core/memory_controller/memory_controller.cpp @@ -16,8 +16,6 @@ #include #include #include -#include -#include #include #include #include @@ -86,6 +84,7 @@ struct TConsumerState { ui64 MinBytes = 0; ui64 MaxBytes = 0; bool CanZeroLimit = false; + std::optional ExactLimit; TConsumerState(const TMemoryConsumer& consumer) : Kind(consumer.Kind) @@ -266,21 +265,31 @@ class TMemoryController : public TActorBootstrapped { ui64 consumersLimitBytes = 0; for (const auto& consumer : consumers) { - ui64 limitBytes = consumer.GetLimit(coefficient); - if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.CanZeroLimit) { - limitBytes = SafeDiff(limitBytes, resultingConsumersConsumption + otherConsumption + externalConsumption - softLimitBytes); + const bool isExactLimitConsumer = consumer.ExactLimit.has_value(); + ui64 limitBytes; + if (isExactLimitConsumer) { + limitBytes = consumer.ExactLimit.value(); + } else { + limitBytes = consumer.GetLimit(coefficient); + if (resultingConsumersConsumption + otherConsumption + externalConsumption > softLimitBytes && consumer.CanZeroLimit) { + limitBytes = SafeDiff(limitBytes, resultingConsumersConsumption + otherConsumption + externalConsumption - softLimitBytes); + } } consumersLimitBytes += limitBytes; LOG_INFO_S(ctx, NKikimrServices::MEMORY_CONTROLLER, "Consumer " << consumer.Kind << " state:" << " Consumption: " << HumanReadableBytes(consumer.Consumption) << " Limit: " << HumanReadableBytes(limitBytes) << " Min: " << HumanReadableBytes(consumer.MinBytes) << " Max: " << HumanReadableBytes(consumer.MaxBytes)); - auto& counters = GetConsumerCounters(consumer.Kind); + auto& counters = GetConsumerCounters(consumer.Kind, !isExactLimitConsumer); counters.Consumption->Set(consumer.Consumption); counters.Reservation->Set(SafeDiff(limitBytes, consumer.Consumption)); counters.LimitBytes->Set(limitBytes); - counters.LimitMinBytes->Set(consumer.MinBytes); - counters.LimitMaxBytes->Set(consumer.MaxBytes); + if (counters.LimitMinBytes) { + counters.LimitMinBytes->Set(consumer.MinBytes); + } + if (counters.LimitMaxBytes) { + counters.LimitMaxBytes->Set(consumer.MaxBytes); + } SetMemoryStats(consumer, memoryStats, limitBytes); ApplyLimit(consumer, limitBytes); @@ -289,8 +298,6 @@ class TMemoryController : public TActorBootstrapped { Counters->GetCounter("Stats/ConsumersLimit")->Set(consumersLimitBytes); ProcessResourceBrokerConfig(ctx, memoryStats, hardLimitBytes, activitiesLimitBytes); - ProcessGroupedMemoryLimiterConfig(ctx, memoryStats, hardLimitBytes); - ProcessCacheConfig(ctx, memoryStats, hardLimitBytes); Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(SelfId().NodeId()), memoryStatsUpdate); @@ -361,10 +368,15 @@ class TMemoryController : public TActorBootstrapped { case EMemoryConsumerKind::MemTable: ApplyMemTableLimit(limitBytes); break; - default: + case EMemoryConsumerKind::SharedCache: + case EMemoryConsumerKind::BlobCache: + case EMemoryConsumerKind::DataAccessorCache: Send(consumer.ActorId, new TEvConsumerLimit(limitBytes)); break; - + case EMemoryConsumerKind::ScanGroupedMemoryLimiter: + case EMemoryConsumerKind::CompGroupedMemoryLimiter: + Send(consumer.ActorId, new TEvConsumerLimit(limitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient, limitBytes)); + break; } } @@ -377,43 +389,6 @@ class TMemoryController : public TActorBootstrapped { } } - void ProcessGroupedMemoryLimiterConfig(const TActorContext& /*ctx*/, NKikimrMemory::TMemoryStats& /*memoryStats*/, ui64 hardLimitBytes) { - ui64 columnTablesScanLimitBytes = GetColumnTablesReadExecutionLimitBytes(Config, hardLimitBytes); - ui64 columnTablesCompactionLimitBytes = GetColumnTablesCompactionLimitBytes(Config, hardLimitBytes) * - NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterCompactionLimitCoefficient; - - ApplyGroupedMemoryLimiterConfig(columnTablesScanLimitBytes, columnTablesCompactionLimitBytes); - } - - void ApplyGroupedMemoryLimiterConfig(const ui64 scanHardLimitBytes, const ui64 compactionHardLimitBytes) { - namespace NGroupedMemoryManager = ::NKikimr::NOlap::NGroupedMemoryManager; - using UpdateMemoryLimitsEv = NGroupedMemoryManager::NEvents::TEvExternal::TEvUpdateMemoryLimits; - - Send(NGroupedMemoryManager::TScanMemoryLimiterOperator::MakeServiceId(SelfId().NodeId()), - new UpdateMemoryLimitsEv(scanHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient, - scanHardLimitBytes)); - - Send(NGroupedMemoryManager::TCompMemoryLimiterOperator::MakeServiceId(SelfId().NodeId()), - new UpdateMemoryLimitsEv(compactionHardLimitBytes * NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterSoftLimitCoefficient, - compactionHardLimitBytes)); - } - - void ProcessCacheConfig(const TActorContext& /*ctx*/, NKikimrMemory::TMemoryStats& /*memoryStats*/, ui64 hardLimitBytes) { - ui64 columnTablesBlobCacheLimitBytes = GetColumnTablesCacheLimitBytes(Config, hardLimitBytes); - - ApplyCacheConfig(columnTablesBlobCacheLimitBytes); - } - - void ApplyCacheConfig(const ui64 cacheLimitBytes) { - using TUpdateMaxBlobCacheDataSizeEv = NBlobCache::TEvBlobCache::TEvUpdateMaxCacheDataSize; - using TGeneralCache = NKikimr::NOlap::NDataAccessorControl::TGeneralCache; - using TGlobalLimits = NKikimr::NOlap::TGlobalLimits; - - Send(NKikimr::NBlobCache::MakeBlobCacheServiceId(), new TUpdateMaxBlobCacheDataSizeEv(cacheLimitBytes * TGlobalLimits::BlobCacheCoefficient)); - - TGeneralCache::UpdateMaxCacheSize(cacheLimitBytes * TGlobalLimits::DataAccessorCoefficient); - } - void ProcessResourceBrokerConfig(const TActorContext& ctx, NKikimrMemory::TMemoryStats& memoryStats, ui64 hardLimitBytes, ui64 activitiesLimitBytes) { ui64 queryExecutionConsumption = TAlignedPagePool::GetGlobalPagePoolSize(); @@ -470,18 +445,21 @@ class TMemoryController : public TActorBootstrapped { queue->MutableLimit()->SetMemory(limitBytes); } - TConsumerCounters& GetConsumerCounters(EMemoryConsumerKind consumer) { + TConsumerCounters& GetConsumerCounters(EMemoryConsumerKind consumer, const bool minMaxRequired) { auto it = ConsumerCounters.FindPtr(consumer); if (it) { return *it; } + TCounterPtr limitMinBytes = minMaxRequired ? Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/LimitMin") : nullptr; + TCounterPtr limitMaxBytes = minMaxRequired ? Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/LimitMax") : nullptr; + return ConsumerCounters.emplace(consumer, TConsumerCounters{ Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/Consumption"), Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/Reservation"), Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/Limit"), - Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/LimitMin"), - Counters->GetCounter(TStringBuilder() << "Consumer/" << consumer << "/LimitMax"), + limitMinBytes, + limitMaxBytes, }).first->second; } @@ -497,6 +475,23 @@ class TMemoryController : public TActorBootstrapped { stats.SetSharedCacheLimit(limitBytes); break; } + case EMemoryConsumerKind::ScanGroupedMemoryLimiter: { + stats.SetColumnTablesReadExecutionConsumption(consumer.Consumption); + stats.SetColumnTablesReadExecutionLimit(limitBytes); + break; + } + case EMemoryConsumerKind::CompGroupedMemoryLimiter: { + stats.SetColumnTablesCompactionConsumption(consumer.Consumption); + stats.SetColumnTablesCompactionLimit(limitBytes); + break; + } + case EMemoryConsumerKind::DataAccessorCache: + case EMemoryConsumerKind::BlobCache: { + stats.SetColumnTablesCacheConsumption( + (stats.HasColumnTablesCacheConsumption() ? stats.GetColumnTablesCacheConsumption() : 0) + consumer.Consumption); + stats.SetColumnTablesCacheLimit((stats.HasColumnTablesCacheLimit() ? stats.GetColumnTablesCacheLimit() : 0) + limitBytes); + break; + } default: Y_ABORT("Unhandled consumer"); } @@ -517,6 +512,23 @@ class TMemoryController : public TActorBootstrapped { result.CanZeroLimit = true; break; } + case EMemoryConsumerKind::ScanGroupedMemoryLimiter: { + result.ExactLimit = GetColumnTablesReadExecutionLimitBytes(Config, hardLimitBytes); + break; + } + case EMemoryConsumerKind::CompGroupedMemoryLimiter: { + result.ExactLimit = GetColumnTablesCompactionLimitBytes(Config, hardLimitBytes) * + NKikimr::NOlap::TGlobalLimits::GroupedMemoryLimiterCompactionLimitCoefficient; + break; + } + case EMemoryConsumerKind::BlobCache: { + result.ExactLimit = GetColumnTablesCacheLimitBytes(Config, hardLimitBytes) * NKikimr::NOlap::TGlobalLimits::BlobCacheCoefficient; + break; + } + case EMemoryConsumerKind::DataAccessorCache: { + result.ExactLimit = GetColumnTablesCacheLimitBytes(Config, hardLimitBytes) * NKikimr::NOlap::TGlobalLimits::DataAccessorCoefficient; + break; + } default: Y_ABORT("Unhandled consumer"); } diff --git a/ydb/core/protos/memory_stats.proto b/ydb/core/protos/memory_stats.proto index 260d531ad8f5..a32968462665 100644 --- a/ydb/core/protos/memory_stats.proto +++ b/ydb/core/protos/memory_stats.proto @@ -27,4 +27,12 @@ message TMemoryStats { optional uint64 QueryExecutionConsumption = 18; optional uint64 QueryExecutionLimit = 19; + + optional uint64 ColumnTablesReadExecutionConsumption = 20; + optional uint64 ColumnTablesReadExecutionLimit = 21; + optional uint64 ColumnTablesCompactionConsumption = 22; + optional uint64 ColumnTablesCompactionLimit = 23; + optional uint64 ColumnTablesCacheConsumption = 24; + optional uint64 ColumnTablesCacheLimit = 25; + } diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 3303da21e88b..f3f7e2025bd4 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -1207,11 +1207,6 @@ namespace Tests { const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); Runtime->RegisterService(NConveyorComposite::TServiceOperator::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); } - { - auto* actor = NOlap::NDataAccessorControl::TGeneralCache::CreateService(NGeneralCache::NPublic::TConfig::BuildDefault(), new ::NMonitoring::TDynamicCounters()); - const auto aid = Runtime->Register(actor, nodeIdx, appData.UserPoolId, TMailboxType::Revolving, 0); - Runtime->RegisterService(NOlap::NDataAccessorControl::TGeneralCache::MakeServiceId(Runtime->GetNodeId(nodeIdx)), aid, nodeIdx); - } Runtime->Register(CreateLabelsMaintainer({}), nodeIdx, appData.SystemPoolId, TMailboxType::Revolving, 0); auto sysViewService = NSysView::CreateSysViewServiceForTests(); diff --git a/ydb/core/tx/columnshard/blob_cache.cpp b/ydb/core/tx/columnshard/blob_cache.cpp index cb4cb4f7b120..14225ec3ce36 100644 --- a/ydb/core/tx/columnshard/blob_cache.cpp +++ b/ydb/core/tx/columnshard/blob_cache.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include @@ -134,6 +135,8 @@ class TBlobCache: public TActorBootstrapped { const TCounterPtr ReadRequests; const TCounterPtr ReadsInQueue; + TIntrusivePtr MemoryConsumer; + public: static constexpr auto ActorActivityType() { return NKikimrServices::TActivity::BLOB_CACHE_ACTOR; @@ -178,6 +181,8 @@ class TBlobCache: public TActorBootstrapped { LOG_S_NOTICE("MaxCacheDataSize: " << (i64)MaxCacheDataSize << " InFlightDataSize: " << (i64)InFlightDataSize); + Send(NMemory::MakeMemoryControllerId(), new NMemory::TEvConsumerRegister(NMemory::EMemoryConsumerKind::BlobCache)); + Become(&TBlobCache::StateFunc); ScheduleWakeup(); } @@ -191,10 +196,11 @@ class TBlobCache: public TActorBootstrapped { HFunc(TEvBlobCache::TEvReadBlobRangeBatch, Handle); HFunc(TEvBlobCache::TEvCacheBlobRange, Handle); HFunc(TEvBlobCache::TEvForgetBlob, Handle); - HFunc(TEvBlobCache::TEvUpdateMaxCacheDataSize, Handle); HFunc(TEvBlobStorage::TEvGetResult, Handle); HFunc(TEvTabletPipe::TEvClientConnected, Handle); HFunc(TEvTabletPipe::TEvClientDestroyed, Handle); + HFunc(NMemory::TEvConsumerRegistered, Handle); + HFunc(NMemory::TEvConsumerLimit, Handle); default: LOG_S_WARN("Unhandled event type: " << ev->GetTypeRewrite() << " event: " << ev->ToString()); @@ -332,10 +338,16 @@ class TBlobCache: public TActorBootstrapped { } CachedRanges.erase(begin, end); + + UpdateConsumption(); } - void Handle(TEvBlobCache::TEvUpdateMaxCacheDataSize::TPtr& ev, const TActorContext&) { - const i64 newMaxCacheDataSize = ev->Get()->MaxCacheDataSize; + void Handle(NMemory::TEvConsumerRegistered::TPtr& ev, const TActorContext&) { + MemoryConsumer = std::move(ev->Get()->Consumer); + } + + void Handle(NMemory::TEvConsumerLimit::TPtr& ev, const TActorContext&) { + const i64 newMaxCacheDataSize = ev->Get()->LimitBytes; if (newMaxCacheDataSize == (i64)MaxCacheDataSize) { return; } @@ -345,6 +357,14 @@ class TBlobCache: public TActorBootstrapped { MaxCacheDataSize = newMaxCacheDataSize; } + void UpdateConsumption() { + if (!MemoryConsumer) { + return; + } + + MemoryConsumer->SetConsumption(CacheDataSize); + } + void SendBatchReadRequestToDS(const std::vector& blobRanges, const ui64 cookie, ui32 dsGroup, TReadItem::EReadVariant readVariant, const TActorContext& ctx) { @@ -579,6 +599,8 @@ class TBlobCache: public TActorBootstrapped { SizeBytes->Add(blobRange.Size); SizeBlobs->Inc(); } + + UpdateConsumption(); } void Evict(const TActorContext&) { @@ -603,6 +625,8 @@ class TBlobCache: public TActorBootstrapped { SizeBytes->Set(CacheDataSize); SizeBlobs->Set(Cache.Size()); } + + UpdateConsumption(); } }; diff --git a/ydb/core/tx/columnshard/blob_cache.h b/ydb/core/tx/columnshard/blob_cache.h index 2d2a2a78e536..61aa6edcf9e0 100644 --- a/ydb/core/tx/columnshard/blob_cache.h +++ b/ydb/core/tx/columnshard/blob_cache.h @@ -39,7 +39,6 @@ struct TEvBlobCache { EvReadBlobRangeResult, EvCacheBlobRange, EvForgetBlob, - EvUpdateMemoryLimit, EvEnd }; @@ -109,14 +108,6 @@ struct TEvBlobCache { : BlobId(blobId) {} }; - - struct TEvUpdateMaxCacheDataSize: public NActors::TEventLocal { - i64 MaxCacheDataSize = 0; - - explicit TEvUpdateMaxCacheDataSize(const i64 maxCacheDataSize) - : MaxCacheDataSize(maxCacheDataSize) { - } - }; }; inline diff --git a/ydb/core/tx/columnshard/data_accessor/cache_policy/policy.h b/ydb/core/tx/columnshard/data_accessor/cache_policy/policy.h index cac4b4ecebde..41bfdb5097b4 100644 --- a/ydb/core/tx/columnshard/data_accessor/cache_policy/policy.h +++ b/ydb/core/tx/columnshard/data_accessor/cache_policy/policy.h @@ -1,4 +1,6 @@ #pragma once + +#include #include #include #include @@ -72,6 +74,10 @@ class TPortionsMetadataCachePolicy { static std::shared_ptr> BuildObjectsProcessor( const NActors::TActorId& serviceActorId); + + static NMemory::EMemoryConsumerKind GetConsumerKind() { + return NMemory::EMemoryConsumerKind::DataAccessorCache; + } }; } // namespace NKikimr::NOlap::NGeneralCache diff --git a/ydb/core/tx/general_cache/service/manager.h b/ydb/core/tx/general_cache/service/manager.h index 44ce5264cc43..bf6af41f5951 100644 --- a/ydb/core/tx/general_cache/service/manager.h +++ b/ydb/core/tx/general_cache/service/manager.h @@ -1,6 +1,7 @@ #pragma once #include "counters.h" +#include #include #include #include @@ -274,6 +275,8 @@ class TManager { THashMap SourcesInfo; + TIntrusivePtr MemoryConsumer; + void DrainQueue(const TSourceId sourceId) { MutableSourceInfo(sourceId).DrainQueue(); } @@ -314,7 +317,12 @@ class TManager { Cache.Insert(i.first, i.second); } Counters->CacheSizeCount->Set(Cache.Size()); - Counters->CacheSizeBytes->Set(Cache.TotalSize()); + + const auto cacheTotalSize = Cache.TotalSize(); + Counters->CacheSizeBytes->Set(cacheTotalSize); + if (MemoryConsumer) { + MemoryConsumer->SetConsumption(cacheTotalSize); + } } public: @@ -422,6 +430,10 @@ class TManager { } Cache.SetMaxSize(maxCacheSize); } + + void SetMemoryConsumer(TIntrusivePtr consumer) { + MemoryConsumer = std::move(consumer); + } }; } // namespace NKikimr::NGeneralCache::NPrivate diff --git a/ydb/core/tx/general_cache/service/service.h b/ydb/core/tx/general_cache/service/service.h index 33404a0a83ab..c0883d5a63aa 100644 --- a/ydb/core/tx/general_cache/service/service.h +++ b/ydb/core/tx/general_cache/service/service.h @@ -2,6 +2,7 @@ #include "counters.h" #include "manager.h" +#include #include #include #include @@ -27,8 +28,12 @@ class TDistributor: public TActorBootstrapped> { Manager->AddRequest(std::make_shared(ev->Get()->ExtractAddresses(), ev->Get()->ExtractCallback(), ev->Get()->GetConsumer())); } - void HandleMain(NPublic::TEvents::TEvUpdateMaxCacheSize::TPtr& ev) { - Manager->UpdateMaxCacheSize(ev->Get()->GetMaxCacheSize()); + void HandleMain(NMemory::TEvConsumerRegistered::TPtr& ev) { + Manager->SetMemoryConsumer(std::move(ev->Get()->Consumer)); + } + + void HandleMain(NMemory::TEvConsumerLimit::TPtr& ev) { + Manager->UpdateMaxCacheSize(ev->Get()->LimitBytes); } void HandleMain(NSource::TEvents::TEvObjectsInfo::TPtr& ev) { @@ -54,10 +59,11 @@ class TDistributor: public TActorBootstrapped> { switch (ev->GetTypeRewrite()) { hFunc(NPublic::TEvents::TEvAskData, HandleMain); hFunc(NPublic::TEvents::TEvKillSource, HandleMain); - hFunc(NPublic::TEvents::TEvUpdateMaxCacheSize, HandleMain); hFunc(NSource::TEvents::TEvObjectsInfo, HandleMain); hFunc(NSource::TEvents::TEvAdditionalObjectsInfo, HandleMain); hFunc(NActors::TEvents::TEvUndelivered, HandleMain); + hFunc(NMemory::TEvConsumerRegistered, HandleMain); + hFunc(NMemory::TEvConsumerLimit, HandleMain); default: AFL_ERROR(NKikimrServices::TX_CONVEYOR)("problem", "unexpected event for general cache")("ev_type", ev->GetTypeName()); break; @@ -74,6 +80,9 @@ class TDistributor: public TActorBootstrapped> { void Bootstrap() { Manager = std::make_unique(TBase::SelfId(), Counters.GetManager()); + + TBase::Send(NMemory::MakeMemoryControllerId(), new NMemory::TEvConsumerRegister(TPolicy::GetConsumerKind())); + TBase::Become(&TDistributor::StateMain); } }; diff --git a/ydb/core/tx/general_cache/usage/events.h b/ydb/core/tx/general_cache/usage/events.h index 8dfd3bb36aa9..d848254ee730 100644 --- a/ydb/core/tx/general_cache/usage/events.h +++ b/ydb/core/tx/general_cache/usage/events.h @@ -20,7 +20,6 @@ struct TEvents { enum EEv { EvAskData = EventSpaceBegin(TKikimrEvents::ES_GENERAL_CACHE_PUBLIC), EvKillSource, - EvUpdateMaxCacheSize, EvEnd }; @@ -68,16 +67,6 @@ struct TEvents { return SourceId; } }; - - class TEvUpdateMaxCacheSize: public NActors::TEventLocal { - private: - YDB_READONLY_CONST(ui64, MaxCacheSize); - - public: - TEvUpdateMaxCacheSize(const ui64 maxCacheSize) - : MaxCacheSize(maxCacheSize) { - } - }; }; } // namespace NKikimr::NGeneralCache::NPublic diff --git a/ydb/core/tx/general_cache/usage/service.h b/ydb/core/tx/general_cache/usage/service.h index 52716b6b86c1..154b3ddacb3f 100644 --- a/ydb/core/tx/general_cache/usage/service.h +++ b/ydb/core/tx/general_cache/usage/service.h @@ -31,12 +31,6 @@ class TServiceOperator { context.Send(GetCurrentNodeServiceId(), new NPublic::TEvents::TEvAskData(consumer, std::move(addresses), std::move(callback))); } - static void UpdateMaxCacheSize(const ui64 maxCacheSize) { - AFL_VERIFY(NActors::TlsActivationContext); - auto& context = NActors::TActorContext::AsActorContext(); - context.Send(GetCurrentNodeServiceId(), new NPublic::TEvents::TEvUpdateMaxCacheSize(maxCacheSize)); - } - static void ModifyObjects(const TSourceId sourceId, THashMap&& add, THashSet&& remove) { AFL_VERIFY(NActors::TlsActivationContext); auto& context = NActors::TActorContext::AsActorContext(); diff --git a/ydb/core/tx/limiter/grouped_memory/service/actor.cpp b/ydb/core/tx/limiter/grouped_memory/service/actor.cpp index 3aa7e7a02ba2..87b728da3278 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/actor.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/actor.cpp @@ -4,6 +4,9 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { void TMemoryLimiterActor::Bootstrap() { Manager = std::make_shared(SelfId(), Config, Name, Signals, DefaultStage); + + Send(NMemory::MakeMemoryControllerId(), new NMemory::TEvConsumerRegister(ConsumerKind)); + Become(&TThis::StateWait); } @@ -47,8 +50,12 @@ void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvStartProcessScope::TPt Manager->RegisterProcessScope(ev->Get()->GetExternalProcessId(), ev->Get()->GetExternalScopeId()); } -void TMemoryLimiterActor::Handle(NEvents::TEvExternal::TEvUpdateMemoryLimits::TPtr& ev) { - Manager->UpdateMemoryLimits(ev->Get()->GetSoftMemoryLimit(), ev->Get()->GetHardMemoryLimit()); +void TMemoryLimiterActor::Handle(NMemory::TEvConsumerRegistered::TPtr& ev) { + Manager->SetMemoryConsumer(std::move(ev->Get()->Consumer)); +} + +void TMemoryLimiterActor::Handle(NMemory::TEvConsumerLimit::TPtr& ev) { + Manager->UpdateMemoryLimits(ev->Get()->LimitBytes, ev->Get()->HardLimitBytes); } } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/service/actor.h b/ydb/core/tx/limiter/grouped_memory/service/actor.h index b8fbc4dec488..644e413132d3 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/actor.h +++ b/ydb/core/tx/limiter/grouped_memory/service/actor.h @@ -2,6 +2,7 @@ #include "counters.h" #include "manager.h" +#include #include #include @@ -17,14 +18,16 @@ class TMemoryLimiterActor: public NActors::TActorBootstrapped Signals; const std::shared_ptr DefaultStage; + const NMemory::EMemoryConsumerKind ConsumerKind; public: TMemoryLimiterActor(const TConfig& config, const TString& name, const std::shared_ptr& signals, - const std::shared_ptr& defaultStage) + const std::shared_ptr& defaultStage, const NMemory::EMemoryConsumerKind consumerKind) : Config(config) , Name(name) , Signals(signals) - , DefaultStage(defaultStage) { + , DefaultStage(defaultStage) + , ConsumerKind(consumerKind) { } void Handle(NEvents::TEvExternal::TEvStartTask::TPtr& ev); @@ -36,7 +39,8 @@ class TMemoryLimiterActor: public NActors::TActorBootstrappedGetTypeName()); } diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp index a3383afc48a1..53499aae026e 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp @@ -148,10 +148,14 @@ void TManager::UnregisterProcessScope(const ui64 externalProcessId, const ui64 e RefreshSignals(); } -void TManager::UpdateMemoryLimits(const ui64 limit, const ui64 hardLimit) { - if (!DefaultStage) { - return; - } +void TManager::SetMemoryConsumer(TIntrusivePtr consumer) { + AFL_ENSURE(DefaultStage); + + DefaultStage->SetMemoryConsumer(std::move(consumer)); +} + +void TManager::UpdateMemoryLimits(const ui64 limit, const std::optional& hardLimit) { + AFL_ENSURE(DefaultStage); bool isLimitIncreased = false; DefaultStage->UpdateMemoryLimits(limit, hardLimit, isLimitIncreased); diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.h b/ydb/core/tx/limiter/grouped_memory/service/manager.h index 2ba7a2a78dbe..0e5347c18342 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.h +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.h @@ -2,6 +2,7 @@ #include "counters.h" #include "process.h" +#include #include #include @@ -103,7 +104,8 @@ class TManager { void UnregisterAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId); void UpdateAllocation(const ui64 externalProcessId, const ui64 externalScopeId, const ui64 allocationId, const ui64 volume); - void UpdateMemoryLimits(const ui64 limit, const ui64 hardLimit); + void SetMemoryConsumer(TIntrusivePtr consumer); + void UpdateMemoryLimits(const ui64 limit, const std::optional& hardLimit); bool IsEmpty() const { return Processes.empty(); diff --git a/ydb/core/tx/limiter/grouped_memory/usage/events.h b/ydb/core/tx/limiter/grouped_memory/usage/events.h index 1071a8ccacac..d3a8200c584c 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/events.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/events.h @@ -19,7 +19,6 @@ struct TEvExternal { EvFinishAllocationProcess, EvStartAllocationProcessScope, EvFinishAllocationProcessScope, - EvUpdateMemoryLimits, EvEnd }; @@ -147,17 +146,5 @@ struct TEvExternal { , ExternalScopeId(externalScopeId) { } }; - - class TEvUpdateMemoryLimits: public NActors::TEventLocal { - private: - YDB_READONLY(ui64, SoftMemoryLimit, 0); - YDB_READONLY(ui64, HardMemoryLimit, 0); - - public: - explicit TEvUpdateMemoryLimits(const ui64 softMemoryLimit, const ui64 hardMemoryLimit) - : SoftMemoryLimit(softMemoryLimit) - , HardMemoryLimit(hardMemoryLimit) { - } - }; }; } // namespace NKikimr::NOlap::NGroupedMemoryManager::NEvents diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 2a18744bf612..8e7608bc115c 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -3,6 +3,7 @@ #include "config.h" #include "events.h" +#include #include #include @@ -29,6 +30,10 @@ class TServiceOperatorImpl { return TMemoryLimiterPolicy::Name; } + static NMemory::EMemoryConsumerKind GetConsumerKind() { + return TMemoryLimiterPolicy::ConsumerKind; + } + public: static std::shared_ptr BuildStageFeatures(const TString& name, const ui64 limit) { if (!IsEnabled()) { @@ -89,13 +94,14 @@ class TServiceOperatorImpl { } static NActors::IActor* CreateService(const TConfig& config, TIntrusivePtr<::NMonitoring::TDynamicCounters> signals) { Register(config, signals); - return new TMemoryLimiterActor(config, GetMemoryLimiterName(), Singleton()->Counters, Singleton()->DefaultStageFeatures); + return new TMemoryLimiterActor(config, GetMemoryLimiterName(), Singleton()->Counters, Singleton()->DefaultStageFeatures, GetConsumerKind()); } }; class TScanMemoryLimiterPolicy { public: static const inline TString Name = "Scan"; + static const inline NMemory::EMemoryConsumerKind ConsumerKind = NMemory::EMemoryConsumerKind::ScanGroupedMemoryLimiter; }; using TScanMemoryLimiterOperator = TServiceOperatorImpl; @@ -103,6 +109,7 @@ using TScanMemoryLimiterOperator = TServiceOperatorImpl; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp index 80ea70f0315d..11a2c278de96 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp +++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.cpp @@ -36,6 +36,7 @@ TConclusionStatus TStageFeatures::Allocate(const ui64 volume) { auto* current = this; while (current) { current->Waiting.Sub(volume); + UpdateConsumption(current); if (current->Counters) { current->Counters->Sub(volume, false); } @@ -60,6 +61,7 @@ TConclusionStatus TStageFeatures::Allocate(const ui64 volume) { auto* current = this; while (current) { current->Usage.Add(volume); + UpdateConsumption(current); AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "allocate")("usage", current->Usage.Val())( "delta", volume); if (current->Counters) { @@ -82,6 +84,7 @@ void TStageFeatures::Free(const ui64 volume, const bool allocated) { } else { current->Waiting.Sub(volume); } + UpdateConsumption(current); AFL_DEBUG(NKikimrServices::GROUPED_MEMORY_LIMITER)("name", current->Name)("event", "free")("usage", current->Usage.Val())( "delta", volume); current = current->Owner.get(); @@ -106,6 +109,8 @@ void TStageFeatures::UpdateVolume(const ui64 from, const ui64 to, const bool all if (Owner) { Owner->UpdateVolume(from, to, allocated); } + + UpdateConsumption(this); } bool TStageFeatures::IsAllocatable(const ui64 volume, const ui64 additional) const { @@ -131,13 +136,34 @@ void TStageFeatures::Add(const ui64 volume, const bool allocated) { if (Owner) { Owner->Add(volume, allocated); } + + UpdateConsumption(this); +} + +void TStageFeatures::SetMemoryConsumer(TIntrusivePtr consumer) { + MemoryConsumer = std::move(consumer); } -void TStageFeatures::UpdateMemoryLimits(const ui64 limit, const ui64 hardLimit, bool& isLimitIncreased) { +void TStageFeatures::UpdateMemoryLimits(const ui64 limit, const std::optional& hardLimit, bool& isLimitIncreased) { isLimitIncreased = limit > Limit; Limit = limit; HardLimit = hardLimit; + + if (Counters) { + Counters->ValueSoftLimit->Set(Limit); + if (HardLimit) { + Counters->ValueHardLimit->Set(*HardLimit); + } + } +} + +void TStageFeatures::UpdateConsumption(const TStageFeatures* current) const { + if (!current || !current->MemoryConsumer) { + return; + } + + current->MemoryConsumer->SetConsumption(current->Usage.Val()); } } // namespace NKikimr::NOlap::NGroupedMemoryManager diff --git a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h index 53ee9b645fff..743c4b61cd09 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/stage_features.h @@ -1,4 +1,6 @@ #pragma once + +#include #include #include @@ -15,6 +17,9 @@ class TStageFeatures { YDB_ACCESSOR_DEF(TPositiveControlInteger, Waiting); std::shared_ptr Owner; std::shared_ptr Counters; + TIntrusivePtr MemoryConsumer; + + void UpdateConsumption(const TStageFeatures* current) const; public: TString DebugString() const; @@ -33,7 +38,8 @@ class TStageFeatures { bool IsAllocatable(const ui64 volume, const ui64 additional) const; void Add(const ui64 volume, const bool allocated); - void UpdateMemoryLimits(const ui64 limit, const ui64 hardLimit, bool& isLimitIncreased); + void SetMemoryConsumer(TIntrusivePtr consumer); + void UpdateMemoryLimits(const ui64 limit, const std::optional& hardLimit, bool& isLimitIncreased); }; } // namespace NKikimr::NOlap::NGroupedMemoryManager