diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index ea00a286ef0a..0214b1d2e2f8 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -628,8 +628,11 @@ void TColumnShard::StartCompaction(const std::shared_ptrGetVersionedIndexReadonlyCopy(); + static std::shared_ptr stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo, - NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier()); + NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard); auto env = std::make_shared(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), std::make_shared( @@ -749,8 +752,11 @@ bool TColumnShard::SetupTtl() { auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); for (auto&& i : indexChanges) { i->Start(*this); + static std::shared_ptr stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); NOlap::NDataFetcher::TRequestInput rInput( - i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier()); + i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier(), processGuard); auto env = std::make_shared(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); if (i->NeedConstruction()) { NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput), @@ -790,8 +796,11 @@ void TColumnShard::SetupCleanupPortions() { changes->Start(*this); auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy(); - NOlap::NDataFetcher::TRequestInput rInput( - changes->GetPortionsToAccess(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier()); + static std::shared_ptr stageFeatures = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); + auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); + NOlap::NDataFetcher::TRequestInput rInput(changes->GetPortionsToAccess(), actualIndexInfo, + NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier(), processGuard); auto env = std::make_shared(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager); NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(std::move(rInput), std::make_shared( diff --git a/ydb/core/tx/columnshard/data_reader/contexts.cpp b/ydb/core/tx/columnshard/data_reader/contexts.cpp index 8b3fb0c1ec64..b47f3ebb4963 100644 --- a/ydb/core/tx/columnshard/data_reader/contexts.cpp +++ b/ydb/core/tx/columnshard/data_reader/contexts.cpp @@ -15,10 +15,10 @@ IFetchingStep::EStepResult IFetchingStep::Execute(const std::shared_ptr& portions, const std::shared_ptr& versions, const NBlobOperations::EConsumer consumer, const TString& externalTaskId, - const std::optional& memoryProcessInfo) + const std::shared_ptr& memoryProcessGuard) : Consumer(consumer) , ExternalTaskId(externalTaskId) - , MemoryProcessInfo(memoryProcessInfo ? *memoryProcessInfo : TFetcherMemoryProcessInfo()) + , MemoryProcessGuard(memoryProcessGuard) { AFL_VERIFY(portions.size()); ActualSchema = versions->GetLastSchema(); diff --git a/ydb/core/tx/columnshard/data_reader/contexts.h b/ydb/core/tx/columnshard/data_reader/contexts.h index a01bde60e0d7..83a08a23ee40 100644 --- a/ydb/core/tx/columnshard/data_reader/contexts.h +++ b/ydb/core/tx/columnshard/data_reader/contexts.h @@ -21,41 +21,28 @@ enum class EFetchingStage : ui32 { Error }; -class TFetcherMemoryProcessInfo { -private: - static inline TAtomicCounter Counter = 0; - ui64 MemoryProcessId = Counter.Inc(); - -public: - ui64 GetMemoryProcessId() const { - return MemoryProcessId; - } -}; - class TCurrentContext: TMoveOnly { private: std::optional> Accessors; YDB_READONLY_DEF(std::vector>, ResourceGuards); std::shared_ptr MemoryProcessGuard; - std::shared_ptr MemoryProcessScopeGuard; - std::shared_ptr MemoryProcessGroupGuard; - TFetcherMemoryProcessInfo MemoryProcessInfo; + std::shared_ptr MemoryScopeGuard; + std::shared_ptr MemoryGroupGuard; std::optional Blobs; std::optional> AssembledData; + inline static TAtomicCounter MemoryScopeIdCounter = 0; public: ui64 GetMemoryProcessId() const { - return MemoryProcessInfo.GetMemoryProcessId(); + return MemoryProcessGuard->GetProcessId(); } ui64 GetMemoryScopeId() const { - AFL_VERIFY(!!MemoryProcessScopeGuard); - return MemoryProcessScopeGuard->GetScopeId(); + return MemoryScopeGuard->GetScopeId(); } ui64 GetMemoryGroupId() const { - AFL_VERIFY(!!MemoryProcessGroupGuard); - return MemoryProcessGroupGuard->GetGroupId(); + return MemoryGroupGuard->GetGroupId(); } void SetBlobs(NBlobOperations::NRead::TCompositeReadBlobs&& blobs) { @@ -91,15 +78,12 @@ class TCurrentContext: TMoveOnly { return result; } - TCurrentContext(const TFetcherMemoryProcessInfo& memoryProcessInfo) - : MemoryProcessInfo(memoryProcessInfo) + TCurrentContext(const std::shared_ptr& memoryProcessGuard) + : MemoryProcessGuard(memoryProcessGuard) { - static std::shared_ptr stageFeatures = - NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); - - MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(GetMemoryProcessId(), { stageFeatures }); - MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(GetMemoryProcessId(), 1); - MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(GetMemoryProcessId(), 1); + AFL_VERIFY(MemoryProcessGuard); + MemoryScopeGuard = MemoryProcessGuard->BuildScopeGuard(MemoryScopeIdCounter.Inc()); + MemoryGroupGuard = MemoryScopeGuard->BuildGroupGuard(); } void SetPortionAccessors(std::vector&& acc) { @@ -261,12 +245,12 @@ class TRequestInput { YDB_READONLY_DEF(std::shared_ptr, ActualSchema); YDB_READONLY(NBlobOperations::EConsumer, Consumer, NBlobOperations::EConsumer::UNDEFINED); YDB_READONLY_DEF(TString, ExternalTaskId); - YDB_READONLY_DEF(TFetcherMemoryProcessInfo, MemoryProcessInfo); + YDB_READONLY_DEF(std::shared_ptr, MemoryProcessGuard); public: TRequestInput(const std::vector& portions, const std::shared_ptr& versions, const NBlobOperations::EConsumer consumer, const TString& externalTaskId, - const std::optional& memoryProcessInfo = std::nullopt); + const std::shared_ptr& memoryProcessGuard); }; } // namespace NKikimr::NOlap::NDataFetcher diff --git a/ydb/core/tx/columnshard/data_reader/fetcher.h b/ydb/core/tx/columnshard/data_reader/fetcher.h index 50bb0f4ee11a..53c6e55a3569 100644 --- a/ydb/core/tx/columnshard/data_reader/fetcher.h +++ b/ydb/core/tx/columnshard/data_reader/fetcher.h @@ -52,7 +52,7 @@ class TCounters: public NColumnShard::TCommonCountersOwner { class TPortionsDataFetcher: TNonCopyable { private: - const TRequestInput Input; + TRequestInput Input; const std::shared_ptr Callback; std::shared_ptr ClassCounters; NCounters::TStateSignalsOperator::TGuard Guard; @@ -80,7 +80,7 @@ class TPortionsDataFetcher: TNonCopyable { , ClassCounters(Singleton()->GetClassCounters(Callback->GetClassName())) , Guard(ClassCounters->GetGuard(EFetchingStage::Created)) , Script(script) - , CurrentContext(input.GetMemoryProcessInfo()) + , CurrentContext(Input.GetMemoryProcessGuard()) , Environment(environment) , ConveyorCategory(conveyorCategory) { diff --git a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp index 24fadcc32d61..4b805aef4f6a 100644 --- a/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp +++ b/ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp @@ -41,8 +41,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr& co NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit) }; ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages); - ProcessScopeGuard = - NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId()); + ProcessScopeGuard = ProcessMemoryGuard->BuildScopeGuard(GetCommonContext()->GetScanId()); auto readSchema = ReadMetadata->GetResultSchema(); SpecColumns = std::make_shared(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp index a431b81077f4..00b5e6f89dfa 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/interval.cpp @@ -38,8 +38,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi , TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard()) , Sources(sources) , IntervalIdx(intervalIdx) - , IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId())) + , IntervalGroupGuard(Context->GetProcessScopeGuard()->BuildGroupGuard()) , IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) { AFL_VERIFY(Sources.size()); for (auto&& [_, i] : Sources) { diff --git a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp index c8c331f3e882..e37e0b92a82d 100644 --- a/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/simple_reader/iterator/source.cpp @@ -33,8 +33,7 @@ void IDataSource::StartProcessing(const std::shared_ptr& sourcePtr) InitStageData(std::make_unique( GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount())); ProcessingStarted = true; - SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard( - GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId()); + SourceGroupGuard = GetContext()->GetProcessScopeGuard()->BuildGroupGuard(); SetMemoryGroupId(SourceGroupGuard->GetGroupId()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx()); // NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan")); diff --git a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp index d602958b80ee..2ec8cd20ddcf 100644 --- a/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp +++ b/ydb/core/tx/columnshard/engines/reader/sys_view/chunks/chunks.cpp @@ -226,9 +226,13 @@ class TFetchingExecutor: public NOlap::NDataFetcher::IFetchCallback { } // namespace TConclusionStatus TStatsIterator::Start() { + static std::shared_ptr stageFeatures = + NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000); std::vector portions; std::shared_ptr actualIndexInfo; auto env = std::make_shared(Context->GetDataAccessorsManager(), Context->GetStoragesManager()); + auto processGuard = + NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures }); for (auto&& i : IndexGranules) { for (auto&& p : i.GetPortions()) { portions.emplace_back(p); @@ -236,8 +240,8 @@ TConclusionStatus TStatsIterator::Start() { if (!actualIndexInfo) { actualIndexInfo = ReadMetadata->GetIndexVersionsPtr(); } - NOlap::NDataFetcher::TRequestInput rInput( - std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId())); + NOlap::NDataFetcher::TRequestInput rInput(std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, + ::ToString(ReadMetadata->GetTxId()), processGuard); NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching( std::move(rInput), std::make_shared(Context), env, NConveyorComposite::ESpecialTaskCategory::Scan); portions.clear(); @@ -249,7 +253,7 @@ TConclusionStatus TStatsIterator::Start() { actualIndexInfo = ReadMetadata->GetIndexVersionsPtr(); } NOlap::NDataFetcher::TRequestInput rInput( - std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId())); + std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId()), processGuard); NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching( std::move(rInput), std::make_shared(Context), env, NConveyorComposite::ESpecialTaskCategory::Scan); } diff --git a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp index a3383afc48a1..633094fe665f 100644 --- a/ydb/core/tx/limiter/grouped_memory/service/manager.cpp +++ b/ydb/core/tx/limiter/grouped_memory/service/manager.cpp @@ -96,6 +96,7 @@ void TManager::RegisterAllocation(const ui64 externalProcessId, const ui64 exter process->RegisterAllocation(externalScopeId, externalGroupId, task, stageIdx); } else { AFL_VERIFY(!task->OnAllocated(std::make_shared(externalProcessId, externalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))( + "process", externalProcessId)("scope", externalScopeId)( "ext_group", externalGroupId)("stage_idx", stageIdx); } RefreshSignals(); diff --git a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h index eeb4169eec7d..6dbd78ce9d0a 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/abstract.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/abstract.h @@ -12,6 +12,10 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { +class TGroupGuard; +class TScopeGuard; +class TProcessGuard; + class TGroupGuard { private: const NActors::TActorId ActorId; @@ -33,6 +37,10 @@ class TProcessGuard { public: TProcessGuard(const NActors::TActorId& actorId, const ui64 processId, const std::vector>& stages); + std::shared_ptr BuildScopeGuard(const ui32 scopeId) const { + return std::make_shared(ActorId, ProcessId, scopeId); + } + ~TProcessGuard(); }; @@ -45,6 +53,11 @@ class TScopeGuard { public: TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId); + std::shared_ptr BuildGroupGuard() const { + static TAtomicCounter counter = 0; + return std::make_shared(ActorId, ProcessId, ScopeId, counter.Inc()); + } + ~TScopeGuard(); }; diff --git a/ydb/core/tx/limiter/grouped_memory/usage/service.h b/ydb/core/tx/limiter/grouped_memory/usage/service.h index 2a18744bf612..4bae125f0d5b 100644 --- a/ydb/core/tx/limiter/grouped_memory/usage/service.h +++ b/ydb/core/tx/limiter/grouped_memory/usage/service.h @@ -13,6 +13,7 @@ namespace NKikimr::NOlap::NGroupedMemoryManager { template class TServiceOperatorImpl { private: + TAtomicCounter LastProcessId = 0; TConfig ServiceConfig = TConfig::BuildDisabledConfig(); std::shared_ptr Counters; std::shared_ptr DefaultStageFeatures = @@ -45,20 +46,18 @@ class TServiceOperatorImpl { return Singleton()->DefaultStageFeatures; } - static std::shared_ptr BuildGroupGuard(const ui64 processId, const ui32 scopeId) { - static TAtomicCounter counter = 0; + static std::shared_ptr BuildProcessGuard(const std::vector>& stages) + requires(!TMemoryLimiterPolicy::ExternalProcessIdAllocation) + { + ui64 processId = Singleton()->LastProcessId.Inc(); auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; - return std::make_shared(MakeServiceId(selfId.NodeId()), processId, scopeId, counter.Inc()); - } - - static std::shared_ptr BuildScopeGuard(const ui64 processId, const ui32 scopeId) { - auto& context = NActors::TActorContext::AsActorContext(); - const NActors::TActorId& selfId = context.SelfID; - return std::make_shared(MakeServiceId(selfId.NodeId()), processId, scopeId); + return std::make_shared(MakeServiceId(selfId.NodeId()), processId, stages); } - static std::shared_ptr BuildProcessGuard(const ui64 processId, const std::vector>& stages) { + static std::shared_ptr BuildProcessGuard(const ui64 processId, const std::vector>& stages) + requires(TMemoryLimiterPolicy::ExternalProcessIdAllocation) + { auto& context = NActors::TActorContext::AsActorContext(); const NActors::TActorId& selfId = context.SelfID; return std::make_shared(MakeServiceId(selfId.NodeId()), processId, stages); @@ -96,6 +95,7 @@ class TServiceOperatorImpl { class TScanMemoryLimiterPolicy { public: static const inline TString Name = "Scan"; + static const bool ExternalProcessIdAllocation = true; }; using TScanMemoryLimiterOperator = TServiceOperatorImpl; @@ -103,6 +103,7 @@ using TScanMemoryLimiterOperator = TServiceOperatorImpl;