Skip to content

Commit 38b43f6

Browse files
ivanmorozov333ivanmorozov333
andauthored
compaction reinit (#19381)
Co-authored-by: ivanmorozov333 <imorozov333@ya.ru>
1 parent 5e18f9f commit 38b43f6

File tree

5 files changed

+43
-21
lines changed

5 files changed

+43
-21
lines changed

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/abstract.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ class TLimitsOverloadChecker: public IOverloadChecker {
321321

322322
class IPortionsLevel {
323323
private:
324-
virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) = 0;
324+
virtual std::vector<TPortionInfo::TPtr> DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) = 0;
325325
virtual ui64 DoGetWeight() const = 0;
326326
virtual TInstant DoGetWeightExpirationInstant() const = 0;
327327
virtual NArrow::NMerger::TIntervalPositions DoGetBucketPositions(const std::shared_ptr<arrow::Schema>& pkSchema) const = 0;
@@ -451,7 +451,7 @@ class IPortionsLevel {
451451
return DoGetAffectedPortionBytes(from, to);
452452
}
453453

454-
void ModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) {
454+
[[nodiscard]] std::vector<TPortionInfo::TPtr> ModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) {
455455
std::vector<TPortionInfo::TPtr> addSelective;
456456
std::vector<TPortionInfo::TPtr> removeSelective;
457457
for (ui32 idx = 0; idx < Selectors.size(); ++idx) {

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.cpp

Lines changed: 22 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets {
44

5-
void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) {
5+
std::vector<TPortionInfo::TPtr> TOneLayerPortions::DoModifyPortions(
6+
const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) {
7+
std::vector<TPortionInfo::TPtr> problems;
68
for (auto&& i : remove) {
79
auto it = Portions.find(TOrderedPortion(i));
810
AFL_VERIFY(it != Portions.end());
@@ -18,26 +20,34 @@ void TOneLayerPortions::DoModifyPortions(const std::vector<TPortionInfo::TPtr>&
1820
{
1921
auto it = info.first;
2022
++it;
21-
if (it != Portions.end()) {
22-
AFL_VERIFY(i->IndexKeyEnd() < it->GetStart())("start", i->IndexKeyStart().DebugString())("end", i->IndexKeyEnd().DebugString())(
23-
"next", it->GetStart().DebugString())("next1", it->GetStart().DebugString())(
24-
"next2", it->GetPortion()->IndexKeyEnd().DebugString())("level_id", GetLevelId())(
25-
"portion_id_new", i->GetPortionId())("portion_id_old", it->GetPortion()->GetPortionId())(
26-
"portion_old", it->GetPortion()->DebugString())("add", sb);
23+
if (it != Portions.end() && it->GetStart() <= i->IndexKeyEnd()) {
24+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("start", i->IndexKeyStart().DebugString())(
25+
"end", i->IndexKeyEnd().DebugString())("next", it->GetStart().DebugString())("next1", it->GetStart().DebugString())(
26+
"next2", it->GetPortion()->IndexKeyEnd().DebugString())("level_id", GetLevelId())("portion_id_new", i->GetPortionId())(
27+
"portion_id_old", it->GetPortion()->GetPortionId())("portion_old", it->GetPortion()->DebugString())("add", sb);
28+
problems.emplace_back(i);
29+
Portions.erase(info.first);
30+
continue;
2731
}
2832
}
2933
{
3034
auto it = info.first;
3135
if (it != Portions.begin()) {
3236
--it;
33-
AFL_VERIFY(it->GetPortion()->IndexKeyEnd() < i->IndexKeyStart())
34-
("start", i->IndexKeyStart().DebugString())("finish", i->IndexKeyEnd().DebugString())("pred_start",
35-
it->GetPortion()->IndexKeyStart().DebugString())("pred_finish", it->GetPortion()->IndexKeyEnd().DebugString())("level_id", GetLevelId())(
36-
"portion_id_new", i->GetPortionId())("portion_id_old", it->GetPortion()->GetPortionId())("add", sb);
37+
if (i->IndexKeyStart() <= it->GetPortion()->IndexKeyEnd()) {
38+
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_COMPACTION)("start", i->IndexKeyStart().DebugString())(
39+
"finish", i->IndexKeyEnd().DebugString())("pred_start", it->GetPortion()->IndexKeyStart().DebugString())(
40+
"pred_finish", it->GetPortion()->IndexKeyEnd().DebugString())("level_id", GetLevelId())(
41+
"portion_id_new", i->GetPortionId())("portion_id_old", it->GetPortion()->GetPortionId())("add", sb);
42+
problems.emplace_back(i);
43+
Portions.erase(info.first);
44+
continue;
45+
}
3746
}
3847
}
3948
}
4049
}
50+
return problems;
4151
}
4252

4353
TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const {
@@ -69,4 +79,4 @@ TCompactionTaskData TOneLayerPortions::DoGetOptimizationTask() const {
6979
return result;
7080
}
7181

72-
}
82+
} // namespace NKikimr::NOlap::NStorageOptimizer::NLCBuckets

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/common_level.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,8 @@ class TOneLayerPortions: public IPortionsLevel {
143143
return result;
144144
}
145145

146-
virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override;
146+
virtual std::vector<TPortionInfo::TPtr> DoModifyPortions(
147+
const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override;
147148

148149
virtual TCompactionTaskData DoGetOptimizationTask() const override;
149150

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/level/zero_level.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,9 @@ class TZeroLevelPortions: public IPortionsLevel {
3434
return 0;
3535
}
3636

37-
virtual void DoModifyPortions(const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override {
37+
virtual std::vector<TPortionInfo::TPtr> DoModifyPortions(
38+
const std::vector<TPortionInfo::TPtr>& add, const std::vector<TPortionInfo::TPtr>& remove) override {
39+
std::vector<TPortionInfo::TPtr> problems;
3840
const bool constructionFlag = Portions.empty();
3941
if (constructionFlag) {
4042
std::vector<TOrderedPortion> ordered;
@@ -54,6 +56,7 @@ class TZeroLevelPortions: public IPortionsLevel {
5456
for (auto&& i : remove) {
5557
AFL_VERIFY(Portions.erase(i));
5658
}
59+
return problems;
5760
}
5861

5962
virtual bool IsLocked(const std::shared_ptr<NDataLocks::TManager>& locksManager) const override {

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.h

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ class TOptimizerPlanner: public IOptimizerPlanner {
7070
virtual void DoModifyPortions(const THashMap<ui64, TPortionInfo::TPtr>& add, const THashMap<ui64, TPortionInfo::TPtr>& remove) override {
7171
std::vector<std::vector<TPortionInfo::TPtr>> removePortionsByLevel;
7272
removePortionsByLevel.resize(Levels.size());
73+
std::vector<std::vector<TPortionInfo::TPtr>> addPortionsByLevels;
74+
addPortionsByLevels.resize(Levels.size());
7375
for (auto&& [_, i] : remove) {
7476
if (i->GetProduced() == NPortion::EProduced::EVICTED) {
7577
continue;
@@ -78,22 +80,28 @@ class TOptimizerPlanner: public IOptimizerPlanner {
7880
AFL_VERIFY(i->GetCompactionLevel() < Levels.size());
7981
removePortionsByLevel[i->GetCompactionLevel()].emplace_back(i);
8082
}
81-
for (ui32 i = 0; i < Levels.size(); ++i) {
82-
Levels[i]->ModifyPortions({}, removePortionsByLevel[i]);
83-
}
83+
std::vector<TPortionInfo::TPtr> problemPortions;
8484
for (auto&& [_, i] : add) {
8585
if (i->GetProduced() == NPortion::EProduced::EVICTED) {
8686
continue;
8787
}
8888
PortionsInfo->AddPortion(i);
89+
addPortionsByLevels[i->GetMeta().GetCompactionLevel()].emplace_back(i);
8990
if (i->GetCompactionLevel() && i->GetCompactionLevel() >= Levels.size()) {
90-
i->MutableMeta().ResetCompactionLevel(0);
91+
problemPortions.emplace_back(i);
9192
}
93+
}
94+
for (ui32 i = 0; i < Levels.size(); ++i) {
95+
auto problems = Levels[i]->ModifyPortions(addPortionsByLevels[i], removePortionsByLevel[i]);
96+
problemPortions.insert(problemPortions.end(), problems.begin(), problems.end());
97+
}
98+
for (auto&& i : problemPortions) {
99+
i->MutableMeta().ResetCompactionLevel(0);
92100
if (!i->GetCompactionLevel() && i->GetPortionType() != EPortionType::Written) {
93101
i->MutableMeta().ResetCompactionLevel(GetAppropriateLevel(0, i->GetCompactionInfo()));
94102
}
95103
AFL_VERIFY(i->GetCompactionLevel() < Levels.size());
96-
Levels[i->GetMeta().GetCompactionLevel()]->ModifyPortions({ i }, {});
104+
AFL_VERIFY(Levels[i->GetMeta().GetCompactionLevel()]->ModifyPortions({ i }, {}).empty());
97105
}
98106
RefreshWeights();
99107
}

0 commit comments

Comments
 (0)