Skip to content

Commit 2fb8d68

Browse files
authored
[fix](compaction) fix time series compaction merge empty rowsets priority (#34677)
1 parent 7ed116b commit 2fb8d68

File tree

2 files changed

+98
-20
lines changed

2 files changed

+98
-20
lines changed

be/src/olap/cumulative_compaction_time_series_policy.cpp

Lines changed: 19 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -74,13 +74,6 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
7474
return 0;
7575
}
7676

77-
// If there is a continuous set of empty rowsets, prioritize merging.
78-
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
79-
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
80-
if (!consecutive_empty_rowsets.empty()) {
81-
return score;
82-
}
83-
8477
// Condition 1: the size of input files for compaction meets the requirement of parameter compaction_goal_size
8578
int64_t compaction_goal_size_mbytes =
8679
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
@@ -127,6 +120,13 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
127120
tablet->set_last_cumu_compaction_success_time(now);
128121
}
129122

123+
// Condition 5: If there is a continuous set of empty rowsets, prioritize merging.
124+
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
125+
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
126+
if (!consecutive_empty_rowsets.empty()) {
127+
return score;
128+
}
129+
130130
return 0;
131131
}
132132

@@ -216,19 +216,6 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
216216
return 0;
217217
}
218218

219-
// If their are many empty rowsets, maybe should be compacted
220-
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
221-
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
222-
if (!consecutive_empty_rowsets.empty()) {
223-
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
224-
<< ", there are too many consecutive empty rowsets, size is "
225-
<< consecutive_empty_rowsets.size();
226-
input_rowsets->clear();
227-
input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(),
228-
consecutive_empty_rowsets.end());
229-
return 0;
230-
}
231-
232219
int64_t compaction_goal_size_mbytes =
233220
tablet->tablet_meta()->time_series_compaction_goal_size_mbytes();
234221

@@ -339,6 +326,18 @@ int TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
339326
}
340327

341328
input_rowsets->clear();
329+
// Condition 5: If their are many empty rowsets, maybe should be compacted
330+
auto consecutive_empty_rowsets = tablet->pick_first_consecutive_empty_rowsets(
331+
tablet->tablet_meta()->time_series_compaction_empty_rowsets_threshold());
332+
if (!consecutive_empty_rowsets.empty()) {
333+
VLOG_NOTICE << "tablet is " << tablet->tablet_id()
334+
<< ", there are too many consecutive empty rowsets, size is "
335+
<< consecutive_empty_rowsets.size();
336+
input_rowsets->clear();
337+
input_rowsets->insert(input_rowsets->end(), consecutive_empty_rowsets.begin(),
338+
consecutive_empty_rowsets.end());
339+
return 0;
340+
}
342341
*compaction_score = 0;
343342

344343
return 0;

be/test/olap/cumulative_compaction_time_series_policy_test.cpp

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,58 @@ class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
212212
rs_metas->push_back(ptr5);
213213
}
214214

215+
void init_all_rs_meta_empty_nonoverlapping(std::vector<RowsetMetaSharedPtr>* rs_metas) {
216+
RowsetMetaSharedPtr ptr1(new RowsetMeta());
217+
init_rs_meta(ptr1, 0, 1);
218+
ptr1->set_total_disk_size(1 * 1024);
219+
rs_metas->push_back(ptr1);
220+
221+
RowsetMetaSharedPtr ptr2(new RowsetMeta());
222+
init_rs_meta(ptr2, 2, 3);
223+
ptr2->set_total_disk_size(2 * 1024);
224+
rs_metas->push_back(ptr2);
225+
226+
RowsetMetaSharedPtr ptr3(new RowsetMeta());
227+
init_rs_meta(ptr3, 4, 4);
228+
ptr3->set_num_segments(0);
229+
rs_metas->push_back(ptr3);
230+
231+
RowsetMetaSharedPtr ptr4(new RowsetMeta());
232+
init_rs_meta(ptr4, 5, 5);
233+
ptr4->set_num_segments(0);
234+
rs_metas->push_back(ptr4);
235+
236+
RowsetMetaSharedPtr ptr5(new RowsetMeta());
237+
init_rs_meta(ptr5, 6, 6);
238+
ptr5->set_num_segments(0);
239+
rs_metas->push_back(ptr5);
240+
241+
RowsetMetaSharedPtr ptr6(new RowsetMeta());
242+
init_rs_meta(ptr6, 7, 7);
243+
ptr6->set_num_segments(0);
244+
rs_metas->push_back(ptr6);
245+
246+
RowsetMetaSharedPtr ptr7(new RowsetMeta());
247+
init_rs_meta(ptr7, 8, 8);
248+
ptr7->set_num_segments(0);
249+
rs_metas->push_back(ptr7);
250+
251+
RowsetMetaSharedPtr ptr8(new RowsetMeta());
252+
init_rs_meta(ptr8, 9, 9);
253+
ptr8->set_num_segments(0);
254+
rs_metas->push_back(ptr8);
255+
256+
RowsetMetaSharedPtr ptr9(new RowsetMeta());
257+
init_rs_meta(ptr9, 10, 10);
258+
ptr9->set_num_segments(0);
259+
rs_metas->push_back(ptr9);
260+
261+
RowsetMetaSharedPtr ptr10(new RowsetMeta());
262+
init_rs_meta(ptr10, 11, 11);
263+
ptr10->set_total_disk_size(2 * 1024);
264+
rs_metas->push_back(ptr10);
265+
}
266+
215267
void init_rs_meta_pick_empty(std::vector<RowsetMetaSharedPtr>* rs_metas) {
216268
RowsetMetaSharedPtr ptr1(new RowsetMeta());
217269
init_rs_meta(ptr1, 0, 1);
@@ -570,6 +622,33 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, _pick_missing_version_cumulativ
570622
compaction.find_longest_consecutive_version(&rowsets3, nullptr);
571623
EXPECT_EQ(0, rowsets3.size());
572624
}
625+
626+
TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_empty_rowsets) {
627+
std::vector<RowsetMetaSharedPtr> rs_metas;
628+
init_all_rs_meta_empty_nonoverlapping(&rs_metas);
629+
630+
for (auto& rowset : rs_metas) {
631+
static_cast<void>(_tablet_meta->add_rs_meta(rowset));
632+
}
633+
634+
TabletSharedPtr _tablet(new Tablet(_tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
635+
static_cast<void>(_tablet->init());
636+
_tablet->calculate_cumulative_point();
637+
638+
auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();
639+
640+
std::vector<RowsetSharedPtr> input_rowsets;
641+
Version last_delete_version {-1, -1};
642+
size_t compaction_score = 0;
643+
644+
_tablet->_cumulative_compaction_policy->pick_input_rowsets(
645+
_tablet.get(), candidate_rowsets, 10, 5, &input_rowsets, &last_delete_version,
646+
&compaction_score, config::enable_delete_when_cumu_compaction);
647+
648+
EXPECT_EQ(7, input_rowsets.size());
649+
EXPECT_EQ(-1, last_delete_version.first);
650+
EXPECT_EQ(-1, last_delete_version.second);
651+
}
573652
} // namespace doris
574653

575654
// @brief Test Stub

0 commit comments

Comments
 (0)