Skip to content

Commit 1f62eb9

Browse files
memory control for graph nodes with specialities (#17301)
1 parent 471d8ac commit 1f62eb9

File tree

24 files changed

+320
-102
lines changed

24 files changed

+320
-102
lines changed

ydb/core/formats/arrow/program/abstract.h

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,47 @@ class TAccessorsCollection;
1313

1414
namespace NKikimr::NArrow::NSSA {
1515

16+
class IMemoryCalculationPolicy {
17+
public:
18+
enum class EStage {
19+
Accessors = 0 /* "ACCESSORS" */,
20+
Filter = 1 /* "FILTER" */,
21+
Fetching = 2 /* "FETCHING" */,
22+
Merge = 3 /* "MERGE" */
23+
};
24+
25+
virtual ~IMemoryCalculationPolicy() = default;
26+
27+
virtual EStage GetStage() const = 0;
28+
virtual ui64 GetReserveMemorySize(
29+
const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const = 0;
30+
};
31+
32+
class TFilterCalculationPolicy: public IMemoryCalculationPolicy {
33+
public:
34+
virtual EStage GetStage() const override {
35+
return EStage::Filter;
36+
}
37+
virtual ui64 GetReserveMemorySize(
38+
const ui64 blobsSize, const ui64 /*rawSize*/, const std::optional<ui32> /*limit*/, const ui32 /*recordsCount*/) const override {
39+
return blobsSize;
40+
}
41+
};
42+
43+
class TFetchingCalculationPolicy: public IMemoryCalculationPolicy {
44+
public:
45+
virtual EStage GetStage() const override {
46+
return EStage::Fetching;
47+
}
48+
virtual ui64 GetReserveMemorySize(const ui64 blobsSize, const ui64 rawSize, const std::optional<ui32> limit, const ui32 recordsCount) const override {
49+
if (limit) {
50+
return std::max<ui64>(blobsSize, rawSize * (1.0 * *limit) / recordsCount);
51+
} else {
52+
return std::max<ui64>(blobsSize, rawSize);
53+
}
54+
}
55+
};
56+
1657
class TIndexCheckOperation {
1758
public:
1859
enum class EOperation : ui32 {
@@ -214,7 +255,8 @@ enum class EProcessorType {
214255
AssembleOriginalData,
215256
CheckIndexData,
216257
CheckHeaderData,
217-
StreamLogic
258+
StreamLogic,
259+
ReserveMemory
218260
};
219261

220262
class TFetchingInfo {
@@ -303,6 +345,13 @@ class IResourceProcessor {
303345
Input.emplace_back(TColumnChainInfo(resourceId));
304346
}
305347

348+
void AddOutput(const ui32 resourceId) {
349+
for (auto&& i : Output) {
350+
AFL_VERIFY(i.GetColumnId() != resourceId);
351+
}
352+
Output.emplace_back(TColumnChainInfo(resourceId));
353+
}
354+
306355
void RemoveInput(const ui32 resourceId) {
307356
for (ui32 idx = 0; idx < Input.size(); ++idx) {
308357
if (Input[idx].GetColumnId() == resourceId) {

ydb/core/formats/arrow/program/execution.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,9 +276,23 @@ class IDataSource {
276276
virtual TConclusion<bool> DoStartFetch(
277277
const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) = 0;
278278

279+
virtual TConclusion<bool> DoStartReserveMemory(const NArrow::NSSA::TProcessorContext& /*context*/,
280+
const THashMap<ui32, IDataSource::TDataAddress>& /*columns*/, const THashMap<ui32, IDataSource::TFetchIndexContext>& /*indexes*/,
281+
const THashMap<ui32, IDataSource::TFetchHeaderContext>& /*headers*/,
282+
const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& /*policy*/) {
283+
return false;
284+
}
285+
279286
public:
280287
virtual ~IDataSource() = default;
281288

289+
TConclusion<bool> StartReserveMemory(const NArrow::NSSA::TProcessorContext& context,
290+
const THashMap<ui32, IDataSource::TDataAddress>& columns, const THashMap<ui32, IDataSource::TFetchIndexContext>& indexes,
291+
const THashMap<ui32, IDataSource::TFetchHeaderContext>& headers, const std::shared_ptr<NArrow::NSSA::IMemoryCalculationPolicy>& policy) {
292+
AFL_VERIFY(policy);
293+
return DoStartReserveMemory(context, columns, indexes, headers, policy);
294+
}
295+
282296
TConclusion<bool> StartFetch(
283297
const NArrow::NSSA::TProcessorContext& context, const std::vector<std::shared_ptr<NArrow::NSSA::IFetchLogic>>& fetchers) {
284298
return DoStartFetch(context, fetchers);

ydb/core/formats/arrow/program/graph_execute.cpp

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
104104
if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Filter) {
105105
AFL_VERIFY(!IsFilterRoot(i.second->GetIdentifier()));
106106
FilterRoot.emplace_back(i.second);
107-
} else if (i.second->GetProcessor()->GetProcessorType() != EProcessorType::Const) {
107+
} else if (i.second->GetProcessor()->GetProcessorType() == EProcessorType::Projection) {
108108
AFL_VERIFY(!ResultRoot)("debug", DebugDOT());
109109
ResultRoot = i.second;
110110
} else {
@@ -124,6 +124,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
124124
for (; it->IsValid(); it->Next()) {
125125
it->MutableCurrentNode().SetSequentialIdx(currentIndex);
126126
for (auto&& i : it->GetProcessorVerified()->GetInput()) {
127+
if (!i.GetColumnId()) {
128+
continue;
129+
}
127130
if (resolver.HasColumn(i.GetColumnId())) {
128131
if (IsFilterRoot(it->GetCurrentGraphNode()->GetIdentifier())) {
129132
FilterColumns.emplace(i.GetColumnId());
@@ -133,6 +136,9 @@ TCompiledGraph::TCompiledGraph(const NOptimization::TGraph& original, const ICol
133136
usage[i.GetColumnId()].InUsage(currentIndex);
134137
}
135138
for (auto&& i : it->GetProcessorVerified()->GetOutput()) {
139+
if (!i.GetColumnId()) {
140+
continue;
141+
}
136142
usage[i.GetColumnId()].Constructed(currentIndex);
137143
}
138144
sortedNodes.emplace_back(&it->MutableCurrentNode());

ydb/core/formats/arrow/program/graph_optimization.cpp

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "header.h"
66
#include "index.h"
77
#include "original.h"
8+
#include "reserve.h"
89
#include "stream_logic.h"
910

1011
#include <ydb/library/arrow_kernels/operations.h>
@@ -206,7 +207,8 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
206207
}
207208
if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size() +
208209
i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetIndexContext().size() +
209-
i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() > 1) {
210+
i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetHeaderContext().size() >
211+
1) {
210212
continue;
211213
}
212214
if (i.second->GetProcessorAs<TOriginalColumnDataProcessor>()->GetDataAddresses().size()) {
@@ -220,6 +222,7 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
220222
}
221223
}
222224
bool changed = false;
225+
TGraphNode* nodeFetch = nullptr;
223226
if (dataAddresses.size() > 1) {
224227
THashSet<ui32> columnIds;
225228
for (auto&& i : dataAddresses) {
@@ -231,16 +234,32 @@ TConclusion<bool> TGraph::OptimizeMergeFetching(TGraphNode* baseNode) {
231234
proc->Add(addr.second);
232235
}
233236
}
234-
auto nodeFetch = AddNode(proc);
237+
nodeFetch = AddNode(proc).get();
235238
FetchersMerged.emplace(nodeFetch->GetIdentifier());
236239
for (auto&& i : dataAddresses) {
237240
for (auto&& to : i->GetOutputEdges()) {
238-
AddEdge(nodeFetch.get(), to.second, to.first.GetResourceId());
241+
AddEdge(nodeFetch, to.second, to.first.GetResourceId());
239242
}
240243
RemoveNode(i->GetIdentifier());
241244
}
242245
changed = true;
246+
} else if (dataAddresses.size() == 1) {
247+
nodeFetch = dataAddresses.front();
243248
}
249+
if (nodeFetch) {
250+
std::shared_ptr<IMemoryCalculationPolicy> policy;
251+
if (baseNode->Is(EProcessorType::Filter)) {
252+
policy = std::make_shared<TFilterCalculationPolicy>();
253+
} else if (baseNode->Is(EProcessorType::Projection)) {
254+
policy = std::make_shared<TFetchingCalculationPolicy>();
255+
}
256+
auto reserveMemory = std::make_shared<TReserveMemoryProcessor>(*nodeFetch->GetProcessorAs<TOriginalColumnDataProcessor>(), policy);
257+
auto nodeReserve = AddNode(reserveMemory);
258+
nodeReserve->GetProcessor()->AddOutput(0);
259+
nodeFetch->GetProcessor()->AddInput(0);
260+
AddEdge(nodeReserve.get(), nodeFetch, 0);
261+
}
262+
244263
if (indexes.size() + headers.size() > 1) {
245264
THashSet<ui32> columnIds;
246265
for (auto&& i : indexes) {
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
#include "execution.h"
2+
#include "original.h"
3+
4+
namespace NKikimr::NArrow::NSSA {
5+
6+
} // namespace NKikimr::NArrow::NSSA
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
#pragma once
2+
#include "abstract.h"
3+
#include "original.h"
4+
5+
namespace NKikimr::NArrow::NSSA {
6+
7+
class TReserveMemoryProcessor: public IResourceProcessor {
8+
private:
9+
using TBase = IResourceProcessor;
10+
11+
THashMap<ui32, IDataSource::TDataAddress> DataAddresses;
12+
THashMap<ui32, IDataSource::TFetchIndexContext> IndexContext;
13+
THashMap<ui32, IDataSource::TFetchHeaderContext> HeaderContext;
14+
std::shared_ptr<IMemoryCalculationPolicy> Policy;
15+
16+
virtual NJson::TJsonValue DoDebugJson() const override {
17+
NJson::TJsonValue result = NJson::JSON_MAP;
18+
if (DataAddresses.size()) {
19+
auto& arrAddr = result.InsertValue("data", NJson::JSON_ARRAY);
20+
for (auto&& i : DataAddresses) {
21+
arrAddr.AppendValue(i.second.DebugJson());
22+
}
23+
}
24+
if (IndexContext.size()) {
25+
auto& indexesArr = result.InsertValue("indexes", NJson::JSON_ARRAY);
26+
for (auto&& i : IndexContext) {
27+
indexesArr.AppendValue(i.second.DebugJson());
28+
}
29+
}
30+
if (HeaderContext.size()) {
31+
auto& headersArr = result.InsertValue("headers", NJson::JSON_ARRAY);
32+
for (auto&& i : HeaderContext) {
33+
headersArr.AppendValue(i.second.DebugJson());
34+
}
35+
}
36+
return result;
37+
}
38+
39+
virtual TConclusion<EExecutionResult> DoExecute(const TProcessorContext& context, const TExecutionNodeContext& /*nodeContext*/) const override {
40+
auto source = context.GetDataSource().lock();
41+
if (!source) {
42+
return TConclusionStatus::Fail("source was destroyed before (original fetch start)");
43+
}
44+
auto conclusion = source->StartReserveMemory(context, DataAddresses, IndexContext, HeaderContext, Policy);
45+
if (conclusion.IsFail()) {
46+
return conclusion;
47+
} else if (conclusion.GetResult()) {
48+
return EExecutionResult::InBackground;
49+
} else {
50+
return EExecutionResult::Success;
51+
}
52+
}
53+
54+
virtual bool IsAggregation() const override {
55+
return false;
56+
}
57+
58+
virtual ui64 DoGetWeight() const override {
59+
return 0;
60+
}
61+
62+
public:
63+
TReserveMemoryProcessor(const TOriginalColumnDataProcessor& original, const std::shared_ptr<IMemoryCalculationPolicy>& policy)
64+
: TBase({}, {}, EProcessorType::ReserveMemory)
65+
, DataAddresses(original.GetDataAddresses())
66+
, IndexContext(original.GetIndexContext())
67+
, HeaderContext(original.GetHeaderContext())
68+
, Policy(policy)
69+
{
70+
AFL_VERIFY(policy);
71+
}
72+
};
73+
74+
} // namespace NKikimr::NArrow::NSSA

ydb/core/formats/arrow/program/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ SRCS(
4848
assign_internal.cpp
4949
custom_registry.cpp
5050
GLOBAL kernel_logic.cpp
51+
reserve.cpp
5152
)
5253

5354
GENERATE_ENUM_SERIALIZATION(abstract.h)

ydb/core/formats/arrow/ut/ut_program_step.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -594,7 +594,7 @@ Y_UNIT_TEST_SUITE(ProgramStep) {
594594
builder.Add(std::make_shared<TProjectionProcessor>(TColumnChainInfo::BuildVector({ 1, 2 })));
595595
auto chain = builder.Finish().DetachResult();
596596
Cerr << chain->DebugDOT() << Endl;
597-
AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats());
597+
AFL_VERIFY(chain->DebugStats() == "[TOTAL:Const:2;Calculation:4;Projection:1;Filter:1;FetchOriginalData:2;AssembleOriginalData:3;CheckIndexData:1;StreamLogic:1;ReserveMemory:1;];SUB:[AssembleOriginalData:1;];")("debug", chain->DebugStats());
598598
}
599599

600600
Y_UNIT_TEST(Projection) {

ydb/core/kqp/ut/common/kqp_ut_common.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,8 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
559559
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
560560
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN);
561561
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);
562+
SetupLogLevelFromTestParam(NKikimrServices::SSA_GRAPH_EXECUTION);
563+
562564

563565
RunCall([this, domain = settings.DomainRoot]{
564566
this->Client->InitRootScheme(domain);
@@ -1556,7 +1558,7 @@ NJson::TJsonValue SimplifyPlan(NJson::TJsonValue& opt, const TGetPlanParams& par
15561558
opName.find("Join") != TString::npos ||
15571559
opName.find("Union") != TString::npos ||
15581560
(opName.find("Filter") != TString::npos && params.IncludeFilters) ||
1559-
(opName.find("HashShuffle") != TString::npos && params.IncludeShuffles)
1561+
(opName.find("HashShuffle") != TString::npos && params.IncludeShuffles)
15601562
) {
15611563
NJson::TJsonValue newChildren;
15621564

ydb/core/tx/columnshard/engines/reader/common_reader/iterator/columns_set.h

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,6 @@ enum class EMemType {
1414
RawSequential
1515
};
1616

17-
enum class EStageFeaturesIndexes {
18-
Accessors = 0,
19-
Filter = 1,
20-
Fetching = 2,
21-
Merge = 3
22-
};
23-
2417
class TIndexesSet {
2518
private:
2619
YDB_READONLY_DEF(std::vector<ui32>, IndexIds);

0 commit comments

Comments
 (0)