Skip to content

change memory control in cs data fetcher #20732

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 13 additions & 4 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -628,8 +628,11 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo
compaction.Start(*this);

auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
NOlap::NDataFetcher::TRequestInput rInput(compaction.GetSwitchedPortions(), actualIndexInfo,
NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier());
NOlap::NBlobOperations::EConsumer::GENERAL_COMPACTION, compaction.GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
std::make_shared<TCompactionExecutor>(
Expand Down Expand Up @@ -749,8 +752,11 @@ bool TColumnShard::SetupTtl() {
auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
for (auto&& i : indexChanges) {
i->Start(*this);
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
NOlap::NDataFetcher::TRequestInput rInput(
i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier());
i->GetPortionsInfo(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::TTL, i->GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
if (i->NeedConstruction()) {
NOlap::NDataFetcher::TPortionsDataFetcher::StartFullPortionsFetching(std::move(rInput),
Expand Down Expand Up @@ -790,8 +796,11 @@ void TColumnShard::SetupCleanupPortions() {
changes->Start(*this);

auto actualIndexInfo = TablesManager.GetPrimaryIndex()->GetVersionedIndexReadonlyCopy();
NOlap::NDataFetcher::TRequestInput rInput(
changes->GetPortionsToAccess(), actualIndexInfo, NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier());
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
auto processGuard = NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
NOlap::NDataFetcher::TRequestInput rInput(changes->GetPortionsToAccess(), actualIndexInfo,
NOlap::NBlobOperations::EConsumer::CLEANUP_PORTIONS, changes->GetTaskIdentifier(), processGuard);
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(DataAccessorsManager.GetObjectPtrVerified(), StoragesManager);
NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(std::move(rInput),
std::make_shared<TCompactionExecutor>(
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/data_reader/contexts.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ IFetchingStep::EStepResult IFetchingStep::Execute(const std::shared_ptr<TPortion

TRequestInput::TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<TVersionedIndex>& versions,
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo)
const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard)
: Consumer(consumer)
, ExternalTaskId(externalTaskId)
, MemoryProcessInfo(memoryProcessInfo ? *memoryProcessInfo : TFetcherMemoryProcessInfo())
, MemoryProcessGuard(memoryProcessGuard)
{
AFL_VERIFY(portions.size());
ActualSchema = versions->GetLastSchema();
Expand Down
42 changes: 13 additions & 29 deletions ydb/core/tx/columnshard/data_reader/contexts.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,41 +21,28 @@ enum class EFetchingStage : ui32 {
Error
};

class TFetcherMemoryProcessInfo {
private:
static inline TAtomicCounter Counter = 0;
ui64 MemoryProcessId = Counter.Inc();

public:
ui64 GetMemoryProcessId() const {
return MemoryProcessId;
}
};

class TCurrentContext: TMoveOnly {
private:
std::optional<std::vector<TPortionDataAccessor>> Accessors;
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
std::shared_ptr<NGroupedMemoryManager::TProcessGuard> MemoryProcessGuard;
std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryProcessScopeGuard;
std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryProcessGroupGuard;
TFetcherMemoryProcessInfo MemoryProcessInfo;
std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryScopeGuard;
std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryGroupGuard;
std::optional<NBlobOperations::NRead::TCompositeReadBlobs> Blobs;
std::optional<std::vector<NArrow::TGeneralContainer>> AssembledData;
inline static TAtomicCounter MemoryScopeIdCounter = 0;

public:
ui64 GetMemoryProcessId() const {
return MemoryProcessInfo.GetMemoryProcessId();
return MemoryProcessGuard->GetProcessId();
}

ui64 GetMemoryScopeId() const {
AFL_VERIFY(!!MemoryProcessScopeGuard);
return MemoryProcessScopeGuard->GetScopeId();
return MemoryScopeGuard->GetScopeId();
}

ui64 GetMemoryGroupId() const {
AFL_VERIFY(!!MemoryProcessGroupGuard);
return MemoryProcessGroupGuard->GetGroupId();
return MemoryGroupGuard->GetGroupId();
}

void SetBlobs(NBlobOperations::NRead::TCompositeReadBlobs&& blobs) {
Expand Down Expand Up @@ -91,15 +78,12 @@ class TCurrentContext: TMoveOnly {
return result;
}

TCurrentContext(const TFetcherMemoryProcessInfo& memoryProcessInfo)
: MemoryProcessInfo(memoryProcessInfo)
TCurrentContext(const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard)
: MemoryProcessGuard(memoryProcessGuard)
{
static std::shared_ptr<NGroupedMemoryManager::TStageFeatures> stageFeatures =
NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);

MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(GetMemoryProcessId(), { stageFeatures });
MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(GetMemoryProcessId(), 1);
MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(GetMemoryProcessId(), 1);
AFL_VERIFY(MemoryProcessGuard);
MemoryScopeGuard = MemoryProcessGuard->BuildScopeGuard(MemoryScopeIdCounter.Inc());
MemoryGroupGuard = MemoryScopeGuard->BuildGroupGuard();
}

void SetPortionAccessors(std::vector<TPortionDataAccessor>&& acc) {
Expand Down Expand Up @@ -261,12 +245,12 @@ class TRequestInput {
YDB_READONLY_DEF(std::shared_ptr<ISnapshotSchema>, ActualSchema);
YDB_READONLY(NBlobOperations::EConsumer, Consumer, NBlobOperations::EConsumer::UNDEFINED);
YDB_READONLY_DEF(TString, ExternalTaskId);
YDB_READONLY_DEF(TFetcherMemoryProcessInfo, MemoryProcessInfo);
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TProcessGuard>, MemoryProcessGuard);

public:
TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<TVersionedIndex>& versions,
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo = std::nullopt);
const std::shared_ptr<NGroupedMemoryManager::TProcessGuard>& memoryProcessGuard);
};

} // namespace NKikimr::NOlap::NDataFetcher
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/data_reader/fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TCounters: public NColumnShard::TCommonCountersOwner {

class TPortionsDataFetcher: TNonCopyable {
private:
const TRequestInput Input;
TRequestInput Input;
const std::shared_ptr<IFetchCallback> Callback;
std::shared_ptr<TClassCounters> ClassCounters;
NCounters::TStateSignalsOperator<EFetchingStage>::TGuard Guard;
Expand Down Expand Up @@ -80,7 +80,7 @@ class TPortionsDataFetcher: TNonCopyable {
, ClassCounters(Singleton<TCounters>()->GetClassCounters(Callback->GetClassName()))
, Guard(ClassCounters->GetGuard(EFetchingStage::Created))
, Script(script)
, CurrentContext(input.GetMemoryProcessInfo())
, CurrentContext(Input.GetMemoryProcessGuard())
, Environment(environment)
, ConveyorCategory(conveyorCategory)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
};
ProcessMemoryGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages);
ProcessScopeGuard =
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId());
ProcessScopeGuard = ProcessMemoryGuard->BuildScopeGuard(GetCommonContext()->GetScanId());

auto readSchema = ReadMetadata->GetResultSchema();
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,7 @@ TFetchingInterval::TFetchingInterval(const NArrow::NMerger::TSortableBatchPositi
, TaskGuard(Context->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
, Sources(sources)
, IntervalIdx(intervalIdx)
, IntervalGroupGuard(NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
Context->GetProcessMemoryControlId(), context->GetCommonContext()->GetScanId()))
, IntervalGroupGuard(Context->GetProcessScopeGuard()->BuildGroupGuard())
, IntervalStateGuard(Context->GetCommonContext()->GetCounters().CreateIntervalStateGuard()) {
AFL_VERIFY(Sources.size());
for (auto&& [_, i] : Sources) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
InitStageData(std::make_unique<TFetchedData>(
GetContext()->GetReadMetadata()->GetProgram().GetChainVerified()->HasAggregations(), sourcePtr->GetRecordsCount()));
ProcessingStarted = true;
SourceGroupGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildGroupGuard(
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
SourceGroupGuard = GetContext()->GetProcessScopeGuard()->BuildGroupGuard();
SetMemoryGroupId(SourceGroupGuard->GetGroupId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,18 +226,23 @@ class TFetchingExecutor: public NOlap::NDataFetcher::IFetchCallback {
} // namespace

TConclusionStatus TStatsIterator::Start() {
static std::shared_ptr<NOlap::NGroupedMemoryManager::TStageFeatures> stageFeatures =
NOlap::NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
std::vector<TPortionInfo::TConstPtr> portions;
std::shared_ptr<TVersionedIndex> actualIndexInfo;
auto env = std::make_shared<NOlap::NDataFetcher::TEnvironment>(Context->GetDataAccessorsManager(), Context->GetStoragesManager());
auto processGuard =
NOlap::NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard({ stageFeatures });
AFL_CRIT(NKikimrServices::TX_COLUMNSHARD)("aboba", "chuns")("process_id", processGuard->GetProcessId());
for (auto&& i : IndexGranules) {
for (auto&& p : i.GetPortions()) {
portions.emplace_back(p);
if (portions.size() == 100) {
if (!actualIndexInfo) {
actualIndexInfo = ReadMetadata->GetIndexVersionsPtr();
}
NOlap::NDataFetcher::TRequestInput rInput(
std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId()));
NOlap::NDataFetcher::TRequestInput rInput(std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN,
::ToString(ReadMetadata->GetTxId()), processGuard);
NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(
std::move(rInput), std::make_shared<TFetchingExecutor>(Context), env, NConveyorComposite::ESpecialTaskCategory::Scan);
portions.clear();
Expand All @@ -249,7 +254,7 @@ TConclusionStatus TStatsIterator::Start() {
actualIndexInfo = ReadMetadata->GetIndexVersionsPtr();
}
NOlap::NDataFetcher::TRequestInput rInput(
std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId()));
std::move(portions), actualIndexInfo, NOlap::NBlobOperations::EConsumer::SYS_VIEW_SCAN, ::ToString(ReadMetadata->GetTxId()), processGuard);
NOlap::NDataFetcher::TPortionsDataFetcher::StartAccessorPortionsFetching(
std::move(rInput), std::make_shared<TFetchingExecutor>(Context), env, NConveyorComposite::ESpecialTaskCategory::Scan);
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/limiter/grouped_memory/service/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ void TManager::RegisterAllocation(const ui64 externalProcessId, const ui64 exter
process->RegisterAllocation(externalScopeId, externalGroupId, task, stageIdx);
} else {
AFL_VERIFY(!task->OnAllocated(std::make_shared<TAllocationGuard>(externalProcessId, externalScopeId, task->GetIdentifier(), OwnerActorId, task->GetMemory()), task))(
"process", externalProcessId)("scope", externalScopeId)(
"ext_group", externalGroupId)("stage_idx", stageIdx);
}
RefreshSignals();
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/tx/limiter/grouped_memory/usage/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@

namespace NKikimr::NOlap::NGroupedMemoryManager {

class TGroupGuard;
class TScopeGuard;
class TProcessGuard;

class TGroupGuard {
private:
const NActors::TActorId ActorId;
Expand All @@ -33,6 +37,10 @@ class TProcessGuard {
public:
TProcessGuard(const NActors::TActorId& actorId, const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages);

std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui32 scopeId) const {
return std::make_shared<TScopeGuard>(ActorId, ProcessId, scopeId);
}

~TProcessGuard();
};

Expand All @@ -45,6 +53,11 @@ class TScopeGuard {
public:
TScopeGuard(const NActors::TActorId& actorId, const ui64 processId, const ui64 scopeId);

std::shared_ptr<TGroupGuard> BuildGroupGuard() const {
static TAtomicCounter counter = 0;
return std::make_shared<TGroupGuard>(ActorId, ProcessId, ScopeId, counter.Inc());
}

~TScopeGuard();
};

Expand Down
21 changes: 11 additions & 10 deletions ydb/core/tx/limiter/grouped_memory/usage/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace NKikimr::NOlap::NGroupedMemoryManager {
template <class TMemoryLimiterPolicy>
class TServiceOperatorImpl {
private:
TAtomicCounter LastProcessId = 0;
TConfig ServiceConfig = TConfig::BuildDisabledConfig();
std::shared_ptr<TCounters> Counters;
std::shared_ptr<TStageFeatures> DefaultStageFeatures =
Expand Down Expand Up @@ -45,20 +46,18 @@ class TServiceOperatorImpl {
return Singleton<TSelf>()->DefaultStageFeatures;
}

static std::shared_ptr<TGroupGuard> BuildGroupGuard(const ui64 processId, const ui32 scopeId) {
static TAtomicCounter counter = 0;
static std::shared_ptr<TProcessGuard> BuildProcessGuard(const std::vector<std::shared_ptr<TStageFeatures>>& stages)
requires(!TMemoryLimiterPolicy::ExternalProcessIdAllocation)
{
ui64 processId = Singleton<TSelf>()->LastProcessId.Inc();
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
return std::make_shared<TGroupGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId, counter.Inc());
}

static std::shared_ptr<TScopeGuard> BuildScopeGuard(const ui64 processId, const ui32 scopeId) {
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
return std::make_shared<TScopeGuard>(MakeServiceId(selfId.NodeId()), processId, scopeId);
return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages);
}

static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages) {
static std::shared_ptr<TProcessGuard> BuildProcessGuard(const ui64 processId, const std::vector<std::shared_ptr<TStageFeatures>>& stages)
requires(TMemoryLimiterPolicy::ExternalProcessIdAllocation)
{
auto& context = NActors::TActorContext::AsActorContext();
const NActors::TActorId& selfId = context.SelfID;
return std::make_shared<TProcessGuard>(MakeServiceId(selfId.NodeId()), processId, stages);
Expand Down Expand Up @@ -96,13 +95,15 @@ class TServiceOperatorImpl {
class TScanMemoryLimiterPolicy {
public:
static const inline TString Name = "Scan";
static const bool ExternalProcessIdAllocation = true;
};

using TScanMemoryLimiterOperator = TServiceOperatorImpl<TScanMemoryLimiterPolicy>;

class TCompMemoryLimiterPolicy {
public:
static const inline TString Name = "Comp";
static constexpr bool ExternalProcessIdAllocation = false;
};

using TCompMemoryLimiterOperator = TServiceOperatorImpl<TCompMemoryLimiterPolicy>;
Expand Down
Loading