Skip to content

Commit d4eaec4

Browse files
correct and speed up compaction (#10867)
1 parent a0b88ca commit d4eaec4

File tree

11 files changed

+68
-40
lines changed

11 files changed

+68
-40
lines changed

ydb/core/tx/columnshard/engines/changes/compaction.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
3030
void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
3131
TBase::DoStart(self);
3232

33-
// Y_ABORT_UNLESS(SwitchedPortions.size());
3433
THashMap<TString, THashSet<TBlobRange>> blobRanges;
3534
auto& index = self.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
3635
for (const auto& p : SwitchedPortions) {

ydb/core/tx/columnshard/engines/changes/general_compaction.cpp

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -237,26 +237,35 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
237237
}
238238

239239
ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) {
240-
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16));
240+
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes();
241241
++PortionsCount;
242-
THashMap<ui32, ui64> maxChunkSizeByColumn;
242+
auto it = MaxMemoryByColumnChunk.begin();
243+
SumMemoryDelta = 0;
244+
const auto advanceIterator = [&](const ui32 columnId, const ui64 maxColumnChunkRawBytes) {
245+
while (it != MaxMemoryByColumnChunk.end() && it->ColumnId < columnId) {
246+
++it;
247+
}
248+
if (it == MaxMemoryByColumnChunk.end() || columnId < it->ColumnId) {
249+
it = MaxMemoryByColumnChunk.insert(it, TColumnInfo(columnId));
250+
}
251+
it->MemoryUsage += maxColumnChunkRawBytes;
252+
SumMemoryDelta = std::max(SumMemoryDelta, it->MemoryUsage);
253+
};
254+
ui32 columnId = 0;
255+
ui64 maxChunkSize = 0;
243256
for (auto&& i : portionInfo.GetRecords()) {
244-
SumMemoryFix += i.BlobRange.Size;
245-
auto it = maxChunkSizeByColumn.find(i.GetColumnId());
246-
if (it == maxChunkSizeByColumn.end()) {
247-
maxChunkSizeByColumn.emplace(i.GetColumnId(), i.GetMeta().GetRawBytes());
248-
} else {
249-
if (it->second < i.GetMeta().GetRawBytes()) {
250-
it->second = i.GetMeta().GetRawBytes();
257+
if (columnId != i.GetColumnId()) {
258+
if (columnId) {
259+
advanceIterator(columnId, maxChunkSize);
251260
}
261+
columnId = i.GetColumnId();
262+
maxChunkSize = 0;
263+
}
264+
if (maxChunkSize < i.GetMeta().GetRawBytes()) {
265+
maxChunkSize = i.GetMeta().GetRawBytes();
252266
}
253267
}
254-
255-
SumMemoryDelta = 0;
256-
for (auto&& i : maxChunkSizeByColumn) {
257-
MaxMemoryByColumnChunk[i.first] += i.second;
258-
SumMemoryDelta = std::max(SumMemoryDelta, MaxMemoryByColumnChunk[i.first]);
259-
}
268+
advanceIterator(columnId, maxChunkSize);
260269

261270
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)(
262271
"portion_info", portionInfo.DebugString());

ydb/core/tx/columnshard/engines/changes/general_compaction.h

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,17 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
6666
ui64 SumMemoryDelta = 0;
6767
ui64 SumMemoryFix = 0;
6868
ui32 PortionsCount = 0;
69-
THashMap<ui32, ui64> MaxMemoryByColumnChunk;
69+
class TColumnInfo {
70+
public:
71+
const ui32 ColumnId;
72+
ui64 MemoryUsage = 0;
73+
TColumnInfo(const ui32 columnId)
74+
: ColumnId(columnId)
75+
{
76+
77+
}
78+
};
79+
std::list<TColumnInfo> MaxMemoryByColumnChunk;
7080

7181
public:
7282
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override;

ydb/core/tx/columnshard/engines/changes/with_appended.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
114114
}
115115

116116
void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
117+
AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size() + AppendedPortions.size());
117118
for (auto&& i : AppendedPortions) {
118119
i.GetPortionConstructor().SetPortionId(context.NextPortionId());
119120
i.GetPortionConstructor().MutableMeta().SetCompactionLevel(TargetCompactionLevel.value_or(0));

ydb/core/tx/columnshard/engines/portions/constructor.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,11 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
8383
}
8484

8585
result.Indexes = std::move(Indexes);
86+
result.Indexes.shrink_to_fit();
8687
result.Records = std::move(Records);
88+
result.Records.shrink_to_fit();
8789
result.BlobIds = std::move(BlobIds);
90+
result.BlobIds.shrink_to_fit();
8891
result.Precalculate();
8992
return result;
9093
}

ydb/core/tx/columnshard/engines/storage/chunks/column.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ class TChunkPreparation: public IPortionColumnChunk {
6262
, Record(address, column)
6363
, ColumnInfo(columnInfo) {
6464
Y_ABORT_UNLESS(column->GetRecordsCount());
65-
First = column->GetScalar(0);
66-
Last = column->GetScalar(column->GetRecordsCount() - 1);
65+
if (ColumnInfo.GetPKColumnIndex()) {
66+
First = column->GetScalar(0);
67+
Last = column->GetScalar(column->GetRecordsCount() - 1);
68+
}
6769
Record.BlobRange.Size = data.size();
6870
}
6971
};

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class TCompactionTaskData {
140140
StopSeparation = point;
141141
}
142142

143-
std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 levelIdx) const {
143+
std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 /*levelIdx*/) const {
144144
std::vector<std::shared_ptr<TPortionInfo>> result;
145145
if (MemoryUsage > ((ui64)1 << 30)) {
146146
auto predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor();
@@ -154,7 +154,7 @@ class TCompactionTaskData {
154154
}
155155
}
156156
return result;
157-
} else if (levelIdx == 0) {
157+
} else {
158158
return Portions;
159159
}
160160
auto moveIds = GetMovePortionIds();

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,10 @@ TOptimizerPlanner::TOptimizerPlanner(
1818
const ui64 maxPortionBlobBytes = (ui64)1 << 20;
1919
Levels.emplace_back(
2020
std::make_shared<TLevelPortions>(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2)));
21-
Levels.emplace_back(
22-
std::make_shared<TLevelPortions>(1, 0.1, maxPortionBlobBytes, Levels.back(), PortionsInfo, Counters->GetLevelCounters(1)));
2321
*/
24-
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, nullptr, Counters->GetLevelCounters(2)));
25-
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0)));
22+
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max()));
23+
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max()));
24+
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180)));
2625
std::reverse(Levels.begin(), Levels.end());
2726
RefreshWeights();
2827
}
@@ -34,14 +33,14 @@ std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TOptimizerPlanner::DoGetOp
3433
auto data = level->GetOptimizationTask();
3534
TSaverContext saverContext(StoragesManager);
3635
std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result;
37-
if (level->GetLevelId() == 0) {
38-
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
39-
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
40-
} else {
36+
// if (level->GetLevelId() == 0) {
4137
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
4238
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
43-
result->AddMovePortions(data.GetMovePortions());
44-
}
39+
// } else {
40+
// result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
41+
// granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
42+
// result->AddMovePortions(data.GetMovePortions());
43+
// }
4544
result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
4645
auto levelPortions = std::dynamic_pointer_cast<TLevelPortions>(Levels[data.GetTargetCompactionLevel()]);
4746
if (levelPortions) {

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,16 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
2525
if (!NextLevel || Portions.size() < 10) {
2626
return 0;
2727
}
28-
if (TInstant::Now() - *PredOptimization < TDuration::Seconds(180)) {
29-
if (PortionsInfo.GetCount() <= 100 || PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
30-
return 0;
31-
}
32-
} else {
33-
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (512 << 10)) {
28+
if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) {
29+
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
3430
return 0;
3531
}
3632
}
3733

38-
THashSet<ui64> portionIds;
3934
const ui64 affectedRawBytes =
4035
NextLevel->GetAffectedPortionBytes(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
4136
/*
37+
THashSet<ui64> portionIds;
4238
auto chain =
4339
targetLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
4440
ui64 affectedRawBytes = 0;

ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ class TZeroLevelPortions: public IPortionsLevel {
88
private:
99
using TBase = IPortionsLevel;
1010
const TLevelCounters LevelCounters;
11+
const TDuration DurationToDrop;
1112
class TOrderedPortion {
1213
private:
1314
YDB_READONLY_DEF(std::shared_ptr<TPortionInfo>, Portion);
@@ -87,9 +88,11 @@ class TZeroLevelPortions: public IPortionsLevel {
8788
virtual TCompactionTaskData DoGetOptimizationTask() const override;
8889

8990
public:
90-
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters)
91+
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop)
9192
: TBase(levelIdx, nextLevel)
92-
, LevelCounters(levelCounters) {
93+
, LevelCounters(levelCounters)
94+
, DurationToDrop(durationToDrop)
95+
{
9396
}
9497
};
9598

0 commit comments

Comments
 (0)