Skip to content

Commit 12789bd

Browse files
scanners unification plain/simple for reuse code (#12847)
1 parent 6c17bb8 commit 12789bd

28 files changed

+1104
-1440
lines changed

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.cpp

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,15 @@ namespace NKikimr::NOlap::NReader::NCommon {
99

1010
TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& commonContext)
1111
: CommonContext(commonContext) {
12-
auto readMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
13-
Y_ABORT_UNLESS(readMetadata->SelectInfo);
12+
ReadMetadata = CommonContext->GetReadMetadataPtrVerifiedAs<TReadMetadata>();
13+
Y_ABORT_UNLESS(ReadMetadata->SelectInfo);
1414

1515
double kffAccessors = 0.01;
1616
double kffFilter = 0.45;
1717
double kffFetching = 0.45;
1818
double kffMerge = 0.10;
1919
TString stagePrefix;
20-
if (readMetadata->GetEarlyFilterColumnIds().size()) {
20+
if (ReadMetadata->GetEarlyFilterColumnIds().size()) {
2121
stagePrefix = "EF";
2222
kffFilter = 0.7;
2323
kffFetching = 0.15;
@@ -41,15 +41,15 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
4141
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildStageFeatures(stagePrefix + "::MERGE", kffMerge * TGlobalLimits::ScanMemoryLimit)
4242
};
4343
ProcessMemoryGuard =
44-
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(CommonContext->GetReadMetadata()->GetTxId(), stages);
45-
ProcessScopeGuard = NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(
46-
CommonContext->GetReadMetadata()->GetTxId(), GetCommonContext()->GetScanId());
44+
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildProcessGuard(ReadMetadata->GetTxId(), stages);
45+
ProcessScopeGuard =
46+
NGroupedMemoryManager::TScanMemoryLimiterOperator::BuildScopeGuard(ReadMetadata->GetTxId(), GetCommonContext()->GetScanId());
4747

48-
auto readSchema = readMetadata->GetResultSchema();
48+
auto readSchema = ReadMetadata->GetResultSchema();
4949
SpecColumns = std::make_shared<TColumnsSet>(TIndexInfo::GetSnapshotColumnIdsSet(), readSchema);
50-
IndexChecker = readMetadata->GetProgram().GetIndexChecker();
50+
IndexChecker = ReadMetadata->GetProgram().GetIndexChecker();
5151
{
52-
auto predicateColumns = readMetadata->GetPKRangesFilter().GetColumnIds(readMetadata->GetIndexInfo());
52+
auto predicateColumns = ReadMetadata->GetPKRangesFilter().GetColumnIds(ReadMetadata->GetIndexInfo());
5353
if (predicateColumns.size()) {
5454
PredicateColumns = std::make_shared<TColumnsSet>(predicateColumns, readSchema);
5555
} else {
@@ -58,26 +58,26 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
5858
}
5959
{
6060
std::set<ui32> columnIds = { NPortion::TSpecialColumns::SPEC_COL_DELETE_FLAG_INDEX };
61-
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, readMetadata->GetResultSchema());
61+
DeletionColumns = std::make_shared<TColumnsSet>(columnIds, ReadMetadata->GetResultSchema());
6262
}
6363

64-
if (!!readMetadata->GetRequestShardingInfo()) {
64+
if (!!ReadMetadata->GetRequestShardingInfo()) {
6565
auto shardingColumnIds =
66-
readMetadata->GetIndexInfo().GetColumnIdsVerified(readMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
67-
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, readMetadata->GetResultSchema());
66+
ReadMetadata->GetIndexInfo().GetColumnIdsVerified(ReadMetadata->GetRequestShardingInfo()->GetShardingInfo()->GetColumnNames());
67+
ShardingColumns = std::make_shared<TColumnsSet>(shardingColumnIds, ReadMetadata->GetResultSchema());
6868
} else {
6969
ShardingColumns = std::make_shared<TColumnsSet>();
7070
}
7171
{
72-
auto efColumns = readMetadata->GetEarlyFilterColumnIds();
72+
auto efColumns = ReadMetadata->GetEarlyFilterColumnIds();
7373
if (efColumns.size()) {
7474
EFColumns = std::make_shared<TColumnsSet>(efColumns, readSchema);
7575
} else {
7676
EFColumns = std::make_shared<TColumnsSet>();
7777
}
7878
}
79-
if (readMetadata->HasProcessingColumnIds()) {
80-
FFColumns = std::make_shared<TColumnsSet>(readMetadata->GetProcessingColumnIds(), readSchema);
79+
if (ReadMetadata->HasProcessingColumnIds()) {
80+
FFColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetProcessingColumnIds(), readSchema);
8181
if (SpecColumns->Contains(*FFColumns) && !EFColumns->IsEmpty()) {
8282
FFColumns = std::make_shared<TColumnsSet>(*EFColumns + *SpecColumns);
8383
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("ff_modified", FFColumns->DebugString());
@@ -95,7 +95,7 @@ TSpecialReadContext::TSpecialReadContext(const std::shared_ptr<TReadContext>& co
9595
}
9696
AllUsageColumns = std::make_shared<TColumnsSet>(*FFColumns + *PredicateColumns);
9797

98-
PKColumns = std::make_shared<TColumnsSet>(readMetadata->GetPKColumnIds(), readSchema);
98+
PKColumns = std::make_shared<TColumnsSet>(ReadMetadata->GetPKColumnIds(), readSchema);
9999
MergeColumns = std::make_shared<TColumnsSet>(*PKColumns + *SpecColumns);
100100

101101
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("columns_context_info", DebugString());

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/context.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,15 @@
22
#include "columns_set.h"
33

44
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
5+
#include <ydb/core/tx/columnshard/engines/reader/common_reader/constructor/read_metadata.h>
56
#include <ydb/core/tx/columnshard/engines/scheme/versions/abstract_scheme.h>
67
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
78

89
namespace NKikimr::NOlap::NReader::NCommon {
910

11+
class TFetchingScript;
12+
class IDataSource;
13+
1014
class TSpecialReadContext {
1115
private:
1216
YDB_READONLY_DEF(std::shared_ptr<TReadContext>, CommonContext);
@@ -28,13 +32,36 @@ class TSpecialReadContext {
2832
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FilterStageMemory);
2933
YDB_READONLY_DEF(std::shared_ptr<NGroupedMemoryManager::TStageFeatures>, FetchingStageMemory);
3034

35+
TReadMetadata::TConstPtr ReadMetadata;
3136
TAtomic AbortFlag = 0;
3237

38+
virtual std::shared_ptr<TFetchingScript> DoGetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) = 0;
39+
3340
protected:
3441
NIndexes::TIndexCheckerContainer IndexChecker;
3542
std::shared_ptr<TColumnsSet> EmptyColumns = std::make_shared<TColumnsSet>();
3643

3744
public:
45+
template <class T>
46+
std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<T>& source) {
47+
return GetColumnsFetchingPlan(std::static_pointer_cast<IDataSource>(source));
48+
}
49+
50+
std::shared_ptr<TFetchingScript> GetColumnsFetchingPlan(const std::shared_ptr<IDataSource>& source) {
51+
return DoGetColumnsFetchingPlan(source);
52+
}
53+
54+
const TReadMetadata::TConstPtr& GetReadMetadata() const {
55+
return ReadMetadata;
56+
}
57+
58+
template <class T>
59+
std::shared_ptr<T> GetReadMetadataVerifiedAs() const {
60+
auto result = std::dynamic_pointer_cast<T>(ReadMetadata);
61+
AFL_VERIFY(!!result);
62+
return result;
63+
}
64+
3865
ui64 GetProcessMemoryControlId() const {
3966
AFL_VERIFY(ProcessMemoryGuard);
4067
return ProcessMemoryGuard->GetProcessId();
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
#include "fetch_steps.h"
2+
#include "source.h"
3+
4+
#include <ydb/core/formats/arrow/common/container.h>
5+
#include <ydb/core/tx/columnshard/engines/scheme/abstract/index_info.h>
6+
#include <ydb/core/tx/conveyor/usage/service.h>
7+
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
8+
9+
#include <ydb/library/formats/arrow/simple_arrays_cache.h>
10+
11+
namespace NKikimr::NOlap::NReader::NCommon {
12+
13+
TConclusion<bool> TColumnBlobsFetchingStep::DoExecuteInplace(
14+
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
15+
return !source->StartFetchingColumns(source, step, Columns);
16+
}
17+
18+
ui64 TColumnBlobsFetchingStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
19+
return source->GetColumnBlobBytes(Columns.GetColumnIds());
20+
}
21+
22+
TConclusion<bool> TAssemblerStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
23+
source->AssembleColumns(Columns);
24+
return true;
25+
}
26+
27+
ui64 TAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
28+
return source->GetColumnRawBytes(Columns->GetColumnIds());
29+
}
30+
31+
TConclusion<bool> TOptionalAssemblerStep::DoExecuteInplace(
32+
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
33+
source->AssembleColumns(Columns, !source->IsSourceInMemory());
34+
return true;
35+
}
36+
37+
ui64 TOptionalAssemblerStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const {
38+
return source->GetColumnsVolume(Columns->GetColumnIds(), EMemType::RawSequential);
39+
}
40+
41+
bool TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
42+
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& /*allocation*/) {
43+
auto data = Source.lock();
44+
if (!data || data->GetContext()->IsAborted()) {
45+
guard->Release();
46+
return false;
47+
}
48+
if (StageIndex == EStageFeaturesIndexes::Accessors) {
49+
data->MutableStageData().SetAccessorsGuard(std::move(guard));
50+
} else {
51+
data->RegisterAllocationGuard(std::move(guard));
52+
}
53+
Step.Next();
54+
auto task = std::make_shared<TStepAction>(data, std::move(Step), data->GetContext()->GetCommonContext()->GetScanActorId());
55+
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
56+
return true;
57+
}
58+
59+
TAllocateMemoryStep::TFetchingStepAllocation::TFetchingStepAllocation(
60+
const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step, const EStageFeaturesIndexes stageIndex)
61+
: TBase(mem)
62+
, Source(source)
63+
, Step(step)
64+
, TasksGuard(source->GetContext()->GetCommonContext()->GetCounters().GetResourcesAllocationTasksGuard())
65+
, StageIndex(stageIndex) {
66+
}
67+
68+
void TAllocateMemoryStep::TFetchingStepAllocation::DoOnAllocationImpossible(const TString& errorMessage) {
69+
auto sourcePtr = Source.lock();
70+
if (sourcePtr) {
71+
sourcePtr->GetContext()->GetCommonContext()->AbortWithError(
72+
"cannot allocate memory for step " + Step.GetName() + ": '" + errorMessage + "'");
73+
}
74+
}
75+
76+
TConclusion<bool> TAllocateMemoryStep::DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const {
77+
ui64 size = PredefinedSize.value_or(0);
78+
for (auto&& i : Packs) {
79+
ui32 sizeLocal = source->GetColumnsVolume(i.GetColumns().GetColumnIds(), i.GetMemType());
80+
if (source->GetStageData().GetUseFilter() && i.GetMemType() != EMemType::Blob && source->GetContext()->GetReadMetadata()->HasLimit()) {
81+
const ui32 filtered =
82+
source->GetStageData().GetFilteredCount(source->GetRecordsCount(), source->GetContext()->GetReadMetadata()->GetLimitRobust());
83+
if (filtered < source->GetRecordsCount()) {
84+
sizeLocal = sizeLocal * 1.0 * filtered / source->GetRecordsCount();
85+
}
86+
}
87+
size += sizeLocal;
88+
}
89+
90+
auto allocation = std::make_shared<TFetchingStepAllocation>(source, size, step, StageIndex);
91+
NGroupedMemoryManager::TScanMemoryLimiterOperator::SendToAllocation(source->GetContext()->GetProcessMemoryControlId(),
92+
source->GetContext()->GetCommonContext()->GetScanId(), source->GetMemoryGroupId(), { allocation }, (ui32)StageIndex);
93+
return false;
94+
}
95+
96+
ui64 TAllocateMemoryStep::GetProcessingDataSize(const std::shared_ptr<IDataSource>& /*source*/) const {
97+
return 0;
98+
}
99+
100+
NKikimr::TConclusion<bool> TBuildStageResultStep::DoExecuteInplace(
101+
const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const {
102+
source->BuildStageResult(source);
103+
return true;
104+
}
105+
106+
} // namespace NKikimr::NOlap::NReader::NCommon
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#pragma once
2+
#include "fetching.h"
3+
4+
#include <ydb/core/tx/limiter/grouped_memory/usage/abstract.h>
5+
6+
namespace NKikimr::NOlap::NReader::NCommon {
7+
8+
class TAllocateMemoryStep: public IFetchingStep {
9+
private:
10+
using TBase = IFetchingStep;
11+
class TColumnsPack {
12+
private:
13+
YDB_READONLY_DEF(TColumnsSetIds, Columns);
14+
YDB_READONLY(EMemType, MemType, EMemType::Blob);
15+
16+
public:
17+
TColumnsPack(const TColumnsSetIds& columns, const EMemType memType)
18+
: Columns(columns)
19+
, MemType(memType) {
20+
}
21+
};
22+
std::vector<TColumnsPack> Packs;
23+
THashMap<ui32, THashSet<EMemType>> Control;
24+
const EStageFeaturesIndexes StageIndex;
25+
const std::optional<ui64> PredefinedSize;
26+
27+
protected:
28+
class TFetchingStepAllocation: public NGroupedMemoryManager::IAllocation {
29+
private:
30+
using TBase = NGroupedMemoryManager::IAllocation;
31+
std::weak_ptr<IDataSource> Source;
32+
TFetchingScriptCursor Step;
33+
NColumnShard::TCounterGuard TasksGuard;
34+
const EStageFeaturesIndexes StageIndex;
35+
virtual bool DoOnAllocated(std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>&& guard,
36+
const std::shared_ptr<NGroupedMemoryManager::IAllocation>& allocation) override;
37+
virtual void DoOnAllocationImpossible(const TString& errorMessage) override;
38+
39+
public:
40+
TFetchingStepAllocation(const std::shared_ptr<IDataSource>& source, const ui64 mem, const TFetchingScriptCursor& step,
41+
const EStageFeaturesIndexes stageIndex);
42+
};
43+
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
44+
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
45+
virtual TString DoDebugString() const override {
46+
return TStringBuilder() << "stage=" << StageIndex << ";";
47+
}
48+
49+
public:
50+
void AddAllocation(const TColumnsSetIds& ids, const EMemType memType) {
51+
if (!ids.GetColumnsCount()) {
52+
return;
53+
}
54+
for (auto&& i : ids.GetColumnIds()) {
55+
AFL_VERIFY(Control[i].emplace(memType).second);
56+
}
57+
Packs.emplace_back(ids, memType);
58+
}
59+
EStageFeaturesIndexes GetStage() const {
60+
return StageIndex;
61+
}
62+
63+
TAllocateMemoryStep(const TColumnsSetIds& columns, const EMemType memType, const EStageFeaturesIndexes stageIndex)
64+
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
65+
, StageIndex(stageIndex) {
66+
AddAllocation(columns, memType);
67+
}
68+
69+
TAllocateMemoryStep(const ui64 memSize, const EStageFeaturesIndexes stageIndex)
70+
: TBase("ALLOCATE_MEMORY::" + ::ToString(stageIndex))
71+
, StageIndex(stageIndex)
72+
, PredefinedSize(memSize) {
73+
}
74+
};
75+
76+
class TAssemblerStep: public IFetchingStep {
77+
private:
78+
using TBase = IFetchingStep;
79+
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
80+
virtual TString DoDebugString() const override {
81+
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
82+
}
83+
84+
public:
85+
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
86+
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
87+
TAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
88+
: TBase("ASSEMBLER" + (specName ? "::" + specName : ""))
89+
, Columns(columns) {
90+
AFL_VERIFY(Columns);
91+
AFL_VERIFY(Columns->GetColumnsCount());
92+
}
93+
};
94+
95+
class TBuildStageResultStep: public IFetchingStep {
96+
private:
97+
using TBase = IFetchingStep;
98+
99+
public:
100+
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& /*step*/) const override;
101+
TBuildStageResultStep()
102+
: TBase("BUILD_STAGE_RESULT") {
103+
}
104+
};
105+
106+
class TOptionalAssemblerStep: public IFetchingStep {
107+
private:
108+
using TBase = IFetchingStep;
109+
YDB_READONLY_DEF(std::shared_ptr<TColumnsSet>, Columns);
110+
virtual TString DoDebugString() const override {
111+
return TStringBuilder() << "columns=" << Columns->DebugString() << ";";
112+
}
113+
114+
public:
115+
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
116+
117+
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
118+
TOptionalAssemblerStep(const std::shared_ptr<TColumnsSet>& columns, const TString& specName = Default<TString>())
119+
: TBase("OPTIONAL_ASSEMBLER" + (specName ? "::" + specName : ""))
120+
, Columns(columns) {
121+
AFL_VERIFY(Columns);
122+
AFL_VERIFY(Columns->GetColumnsCount());
123+
}
124+
};
125+
126+
class TColumnBlobsFetchingStep: public IFetchingStep {
127+
private:
128+
using TBase = IFetchingStep;
129+
TColumnsSetIds Columns;
130+
131+
protected:
132+
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const override;
133+
virtual TString DoDebugString() const override {
134+
return TStringBuilder() << "columns=" << Columns.DebugString() << ";";
135+
}
136+
137+
public:
138+
virtual ui64 GetProcessingDataSize(const std::shared_ptr<IDataSource>& source) const override;
139+
TColumnBlobsFetchingStep(const TColumnsSetIds& columns)
140+
: TBase("FETCHING_COLUMNS")
141+
, Columns(columns) {
142+
AFL_VERIFY(Columns.GetColumnsCount());
143+
}
144+
};
145+
146+
} // namespace NKikimr::NOlap::NReader::NCommon

0 commit comments

Comments
 (0)