Skip to content

Commit 344960e

Browse files
authored
column assembling in data fetching on cs (#20649)
1 parent 2674df3 commit 344960e

File tree

15 files changed

+245
-90
lines changed

15 files changed

+245
-90
lines changed

ydb/core/tx/columnshard/blobs_action/abstract/read.h

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,14 +70,22 @@ class TActionReadBlobs {
7070
return it->second;
7171
}
7272

73-
TString Extract(const TBlobRange& bRange) {
73+
std::optional<TString> ExtractOptional(const TBlobRange& bRange) {
7474
auto it = Blobs.find(bRange);
75-
AFL_VERIFY(it != Blobs.end())("range", bRange.ToString());
75+
if (it == Blobs.end()) {
76+
return std::nullopt;
77+
}
7678
TString result = it->second;
7779
Blobs.erase(it);
7880
return result;
7981
}
8082

83+
TString ExtractVerified(const TBlobRange& bRange) {
84+
auto result = ExtractOptional(bRange);
85+
AFL_VERIFY(result)("range", bRange.ToString());
86+
return std::move(*result);
87+
}
88+
8189
bool IsEmpty() const {
8290
return Blobs.empty();
8391
}

ydb/core/tx/columnshard/blobs_reader/task.h

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,15 +63,25 @@ class TCompositeReadBlobs {
6363
}
6464
return it->second.GetBlobRangeOptional(range);
6565
}
66-
TString Extract(const TString& storageId, const TBlobRange& range) {
66+
std::optional<TString> ExtractOptional(const TString& storageId, const TBlobRange& range) {
6767
auto it = BlobsByStorage.find(storageId);
68-
AFL_VERIFY(it != BlobsByStorage.end())("range", range.ToString())("storage_id", storageId);
69-
auto result = it->second.Extract(range);
68+
if (it == BlobsByStorage.end()) {
69+
return std::nullopt;
70+
}
71+
auto result = it->second.ExtractOptional(range);
72+
if (!result) {
73+
return std::nullopt;
74+
}
7075
if (it->second.IsEmpty()) {
7176
BlobsByStorage.erase(it);
7277
}
7378
return result;
7479
}
80+
TString ExtractVerified(const TString& storageId, const TBlobRange& range) {
81+
auto result = ExtractOptional(storageId, range);
82+
AFL_VERIFY(result)("range", range.ToString())("storage_id", storageId);
83+
return std::move(*result);
84+
}
7585

7686
ui64 GetTotalBlobsSize() const {
7787
ui64 result = 0;

ydb/core/tx/columnshard/columnshard__statistics.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,7 @@ class TColumnPortionsAccumulator {
141141
for (auto&& [columnId, data] : RangesByColumn) {
142142
for (auto&& [storageId, blobs] : data) {
143143
for (auto&& b : blobs) {
144-
const TString blob = blobsData.Extract(storageId, b);
144+
const TString blob = blobsData.ExtractVerified(storageId, b);
145145
auto sketch = std::unique_ptr<TCountMinSketch>(TCountMinSketch::FromString(blob.data(), blob.size()));
146146
auto it = SketchesByColumns.find(columnId);
147147
AFL_VERIFY(it != SketchesByColumns.end());

ydb/core/tx/columnshard/columnshard_impl.cpp

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -510,10 +510,6 @@ class TCompactionExecutor: public NOlap::NDataFetcher::IFetchCallback {
510510
const bool NeedBlobs = true;
511511
const std::shared_ptr<TAtomicCounter> TabletActivity;
512512

513-
virtual std::optional<ui64> GetMemoryForUsage() const override {
514-
return Changes->CalcMemoryForUsage();
515-
}
516-
517513
virtual bool IsAborted() const override {
518514
return !TabletActivity->Val();
519515
}
@@ -549,9 +545,9 @@ class TCompactionExecutor: public NOlap::NDataFetcher::IFetchCallback {
549545
NActors::TLogContextGuard g(
550546
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", TabletId)("parent_id", ParentActorId));
551547
if (NeedBlobs) {
552-
AFL_VERIFY(context.GetResourceGuards().size() == 2);
548+
AFL_VERIFY(context.GetResourceGuards().size() == 3);
553549
} else {
554-
AFL_VERIFY(context.GetResourceGuards().size() == 1);
550+
AFL_VERIFY(context.GetResourceGuards().size() == 2);
555551
}
556552
if (NeedBlobs) {
557553
Changes->Blobs = context.ExtractBlobs();

ydb/core/tx/columnshard/data_reader/contexts.cpp

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@ IFetchingStep::EStepResult IFetchingStep::Execute(const std::shared_ptr<TPortion
1414
}
1515

1616
TRequestInput::TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<TVersionedIndex>& versions,
17-
const NBlobOperations::EConsumer consumer, const TString& externalTaskId)
17+
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
18+
const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo)
1819
: Consumer(consumer)
19-
, ExternalTaskId(externalTaskId) {
20+
, ExternalTaskId(externalTaskId)
21+
, MemoryProcessInfo(memoryProcessInfo ? *memoryProcessInfo : TFetcherMemoryProcessInfo())
22+
{
2023
AFL_VERIFY(portions.size());
2124
ActualSchema = versions->GetLastSchema();
2225
for (auto&& i : portions) {

ydb/core/tx/columnshard/data_reader/contexts.h

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -21,27 +21,40 @@ enum class EFetchingStage : ui32 {
2121
Error
2222
};
2323

24+
class TFetcherMemoryProcessInfo {
25+
private:
26+
static inline TAtomicCounter Counter = 0;
27+
ui64 MemoryProcessId = Counter.Inc();
28+
29+
public:
30+
ui64 GetMemoryProcessId() const {
31+
return MemoryProcessId;
32+
}
33+
};
34+
2435
class TCurrentContext: TMoveOnly {
2536
private:
2637
std::optional<std::vector<TPortionDataAccessor>> Accessors;
2738
YDB_READONLY_DEF(std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>, ResourceGuards);
2839
std::shared_ptr<NGroupedMemoryManager::TProcessGuard> MemoryProcessGuard;
2940
std::shared_ptr<NGroupedMemoryManager::TScopeGuard> MemoryProcessScopeGuard;
3041
std::shared_ptr<NGroupedMemoryManager::TGroupGuard> MemoryProcessGroupGuard;
31-
static inline TAtomicCounter Counter = 0;
32-
const ui64 MemoryProcessId = Counter.Inc();
42+
TFetcherMemoryProcessInfo MemoryProcessInfo;
3343
std::optional<NBlobOperations::NRead::TCompositeReadBlobs> Blobs;
44+
std::optional<std::vector<NArrow::TGeneralContainer>> AssembledData;
3445

3546
public:
3647
ui64 GetMemoryProcessId() const {
37-
return MemoryProcessId;
48+
return MemoryProcessInfo.GetMemoryProcessId();
3849
}
3950

4051
ui64 GetMemoryScopeId() const {
52+
AFL_VERIFY(!!MemoryProcessScopeGuard);
4153
return MemoryProcessScopeGuard->GetScopeId();
4254
}
4355

4456
ui64 GetMemoryGroupId() const {
57+
AFL_VERIFY(!!MemoryProcessGroupGuard);
4558
return MemoryProcessGroupGuard->GetGroupId();
4659
}
4760

@@ -66,21 +79,35 @@ class TCurrentContext: TMoveOnly {
6679
Blobs.reset();
6780
}
6881

69-
TCurrentContext() {
82+
void SetAssembledData(std::vector<NArrow::TGeneralContainer>&& data) {
83+
AFL_VERIFY(!AssembledData);
84+
AssembledData = std::move(data);
85+
}
86+
87+
std::vector<NArrow::TGeneralContainer> ExtractAssembledData() {
88+
AFL_VERIFY(!!AssembledData);
89+
auto result = std::move(*AssembledData);
90+
AssembledData.reset();
91+
return result;
92+
}
93+
94+
TCurrentContext(const TFetcherMemoryProcessInfo& memoryProcessInfo)
95+
: MemoryProcessInfo(memoryProcessInfo)
96+
{
7097
static std::shared_ptr<NGroupedMemoryManager::TStageFeatures> stageFeatures =
7198
NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildStageFeatures("DEFAULT", 1000000000);
7299

73-
MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(MemoryProcessId, { stageFeatures });
74-
MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(MemoryProcessId, 1);
75-
MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(MemoryProcessId, 1);
100+
MemoryProcessGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildProcessGuard(GetMemoryProcessId(), { stageFeatures });
101+
MemoryProcessScopeGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildScopeGuard(GetMemoryProcessId(), 1);
102+
MemoryProcessGroupGuard = NGroupedMemoryManager::TCompMemoryLimiterOperator::BuildGroupGuard(GetMemoryProcessId(), 1);
76103
}
77104

78105
void SetPortionAccessors(std::vector<TPortionDataAccessor>&& acc) {
79106
AFL_VERIFY(!Accessors);
80107
Accessors = std::move(acc);
81108
}
82109

83-
const std::vector<TPortionDataAccessor> GetPortionAccessors() const {
110+
const std::vector<TPortionDataAccessor>& GetPortionAccessors() const {
84111
AFL_VERIFY(Accessors);
85112
return *Accessors;
86113
}
@@ -107,20 +134,8 @@ class IFetchCallback {
107134
virtual ~IFetchCallback() = default;
108135

109136
virtual ui64 GetNecessaryDataMemory(
110-
const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& columnIds, const std::vector<TPortionDataAccessor>& acc) const {
111-
ui64 memory = 0;
112-
for (auto&& a : acc) {
113-
if (columnIds) {
114-
memory += a.GetColumnBlobBytes(columnIds->GetColumnIds());
115-
} else {
116-
memory += a.GetPortionInfo().GetTotalBlobBytes();
117-
}
118-
}
119-
return memory;
120-
}
121-
122-
virtual std::optional<ui64> GetMemoryForUsage() const {
123-
return std::nullopt;
137+
const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& /*columnIds*/, const std::vector<TPortionDataAccessor>& /*acc*/) const {
138+
return 0;
124139
}
125140

126141
virtual bool IsAborted() const = 0;
@@ -246,10 +261,12 @@ class TRequestInput {
246261
YDB_READONLY_DEF(std::shared_ptr<ISnapshotSchema>, ActualSchema);
247262
YDB_READONLY(NBlobOperations::EConsumer, Consumer, NBlobOperations::EConsumer::UNDEFINED);
248263
YDB_READONLY_DEF(TString, ExternalTaskId);
264+
YDB_READONLY_DEF(TFetcherMemoryProcessInfo, MemoryProcessInfo);
249265

250266
public:
251267
TRequestInput(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<TVersionedIndex>& versions,
252-
const NBlobOperations::EConsumer consumer, const TString& externalTaskId);
268+
const NBlobOperations::EConsumer consumer, const TString& externalTaskId,
269+
const std::optional<TFetcherMemoryProcessInfo>& memoryProcessInfo = std::nullopt);
253270
};
254271

255272
} // namespace NKikimr::NOlap::NDataFetcher

ydb/core/tx/columnshard/data_reader/fetcher.cpp

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,33 @@
33

44
namespace NKikimr::NOlap::NDataFetcher {
55

6-
void TPortionsDataFetcher::StartColumnsFetching(TRequestInput&& input, std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds,
6+
void TPortionsDataFetcher::StartAssembledColumnsFetching(TRequestInput&& input,
7+
const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds, std::shared_ptr<IFetchCallback>&& callback,
8+
const std::shared_ptr<TEnvironment>& environment, const NConveyorComposite::ESpecialTaskCategory conveyorCategory) {
9+
std::shared_ptr<TScript> script = [&]() {
10+
std::vector<std::shared_ptr<IFetchingStep>> steps;
11+
steps.emplace_back(std::make_shared<TAskAccessorResourcesStep>());
12+
steps.emplace_back(std::make_shared<TAskAccessorsStep>());
13+
steps.emplace_back(std::make_shared<TAskRawDataResourceStep>(entityIds));
14+
steps.emplace_back(std::make_shared<TAskDataStep>(entityIds));
15+
steps.emplace_back(std::make_shared<TAssembleDataStep>());
16+
steps.emplace_back(std::make_shared<TAskUsageResourceStep>(entityIds));
17+
return std::make_shared<TScript>(std::move(steps), "ASSEMBLED_PARTIAL_PORTIONS_FETCHING::" + ::ToString(input.GetConsumer()));
18+
}();
19+
auto fetcher = std::make_shared<TPortionsDataFetcher>(std::move(input), std::move(callback), environment, script, conveyorCategory);
20+
fetcher->Resume(fetcher);
21+
}
22+
23+
void TPortionsDataFetcher::StartColumnsFetching(TRequestInput&& input, const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds,
724
std::shared_ptr<IFetchCallback>&& callback, const std::shared_ptr<TEnvironment>& environment,
825
const NConveyorComposite::ESpecialTaskCategory conveyorCategory) {
926
std::shared_ptr<TScript> script = [&]() {
1027
std::vector<std::shared_ptr<IFetchingStep>> steps;
1128
steps.emplace_back(std::make_shared<TAskAccessorResourcesStep>());
1229
steps.emplace_back(std::make_shared<TAskAccessorsStep>());
13-
steps.emplace_back(std::make_shared<TAskDataResourceStep>(entityIds));
30+
steps.emplace_back(std::make_shared<TAskBlobDataResourceStep>(entityIds));
1431
steps.emplace_back(std::make_shared<TAskDataStep>(entityIds));
32+
steps.emplace_back(std::make_shared<TAskUsageResourceStep>(entityIds));
1533
return std::make_shared<TScript>(std::move(steps), "PARTIAL_PORTIONS_FETCHING::" + ::ToString(input.GetConsumer()));
1634
}();
1735
auto fetcher = std::make_shared<TPortionsDataFetcher>(std::move(input), std::move(callback), environment, script, conveyorCategory);
@@ -24,8 +42,9 @@ void TPortionsDataFetcher::StartFullPortionsFetching(TRequestInput&& input, std:
2442
std::vector<std::shared_ptr<IFetchingStep>> steps;
2543
steps.emplace_back(std::make_shared<TAskAccessorResourcesStep>());
2644
steps.emplace_back(std::make_shared<TAskAccessorsStep>());
27-
steps.emplace_back(std::make_shared<TAskDataResourceStep>(nullptr));
45+
steps.emplace_back(std::make_shared<TAskBlobDataResourceStep>(nullptr));
2846
steps.emplace_back(std::make_shared<TAskDataStep>(nullptr));
47+
steps.emplace_back(std::make_shared<TAskUsageResourceStep>(nullptr));
2948
return std::make_shared<TScript>(std::move(steps), "FULL_PORTIONS_FETCHING::" + ::ToString(input.GetConsumer()));
3049
}();
3150
auto fetcher = std::make_shared<TPortionsDataFetcher>(std::move(input), std::move(callback), environment, script, conveyorCategory);
@@ -38,6 +57,7 @@ void TPortionsDataFetcher::StartAccessorPortionsFetching(TRequestInput&& input,
3857
std::vector<std::shared_ptr<IFetchingStep>> steps;
3958
steps.emplace_back(std::make_shared<TAskAccessorResourcesStep>());
4059
steps.emplace_back(std::make_shared<TAskAccessorsStep>());
60+
steps.emplace_back(std::make_shared<TAskUsageResourceStep>(nullptr));
4161
return std::make_shared<TScript>(std::move(steps), "ACCESSOR_PORTIONS_FETCHING::" + ::ToString(input.GetConsumer()));
4262
}();
4363
auto fetcher = std::make_shared<TPortionsDataFetcher>(std::move(input), std::move(callback), environment, script, conveyorCategory);

ydb/core/tx/columnshard/data_reader/fetcher.h

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,10 @@ class TPortionsDataFetcher: TNonCopyable {
8080
, ClassCounters(Singleton<TCounters>()->GetClassCounters(Callback->GetClassName()))
8181
, Guard(ClassCounters->GetGuard(EFetchingStage::Created))
8282
, Script(script)
83+
, CurrentContext(input.GetMemoryProcessInfo())
8384
, Environment(environment)
84-
, ConveyorCategory(conveyorCategory) {
85+
, ConveyorCategory(conveyorCategory)
86+
{
8587
AFL_VERIFY(Environment);
8688
AFL_VERIFY(Callback);
8789
}
@@ -97,7 +99,11 @@ class TPortionsDataFetcher: TNonCopyable {
9799
static void StartFullPortionsFetching(TRequestInput&& input, std::shared_ptr<IFetchCallback>&& callback,
98100
const std::shared_ptr<TEnvironment>& environment, const NConveyorComposite::ESpecialTaskCategory conveyorCategory);
99101

100-
static void StartColumnsFetching(TRequestInput&& input, std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds,
102+
static void StartColumnsFetching(TRequestInput&& input, const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds,
103+
std::shared_ptr<IFetchCallback>&& callback, const std::shared_ptr<TEnvironment>& environment,
104+
const NConveyorComposite::ESpecialTaskCategory conveyorCategory);
105+
106+
static void StartAssembledColumnsFetching(TRequestInput&& input, const std::shared_ptr<NReader::NCommon::TColumnsSetIds>& entityIds,
101107
std::shared_ptr<IFetchCallback>&& callback, const std::shared_ptr<TEnvironment>& environment,
102108
const NConveyorComposite::ESpecialTaskCategory conveyorCategory);
103109

0 commit comments

Comments
 (0)