Skip to content

Commit ef64350

Browse files
authored
chore: ignore stream column compare in block compact mutator (#15446)
* fix recluster * chore: ignore stream column compare in block compact mutator * fix test
1 parent 3b3e308 commit ef64350

File tree

6 files changed

+27
-6
lines changed

6 files changed

+27
-6
lines changed

src/query/service/src/interpreters/interpreter_table_optimize.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -247,6 +247,7 @@ impl OptimizeTableInterpreter {
247247
mutator.remained_blocks,
248248
mutator.removed_segment_indexes,
249249
mutator.removed_segment_summary,
250+
self.plan.need_lock,
250251
)?;
251252

252253
build_res =

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ impl Interpreter for ReclusterTableInterpreter {
159159
mutator.remained_blocks,
160160
mutator.removed_segment_indexes,
161161
mutator.removed_segment_summary,
162+
true,
162163
)?;
163164

164165
let mut build_res =
@@ -222,6 +223,7 @@ impl Interpreter for ReclusterTableInterpreter {
222223
}
223224
}
224225

226+
#[allow(clippy::too_many_arguments)]
225227
pub fn build_recluster_physical_plan(
226228
tasks: Vec<ReclusterTask>,
227229
table_info: TableInfo,
@@ -230,6 +232,7 @@ pub fn build_recluster_physical_plan(
230232
remained_blocks: Vec<Arc<BlockMeta>>,
231233
removed_segment_indexes: Vec<usize>,
232234
removed_segment_summary: Statistics,
235+
need_lock: bool,
233236
) -> Result<PhysicalPlan> {
234237
let is_distributed = tasks.len() > 1;
235238
let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource {
@@ -258,6 +261,7 @@ pub fn build_recluster_physical_plan(
258261
removed_segment_indexes,
259262
removed_segment_summary,
260263
plan_id: u32::MAX,
264+
need_lock,
261265
}));
262266
plan.adjust_plan_id(&mut 0);
263267
Ok(plan)

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,13 @@ impl PipelineBuilder {
228228

229229
let snapshot_gen =
230230
MutationGenerator::new(recluster_sink.snapshot.clone(), MutationKind::Recluster);
231-
let lock = LockManager::create_table_lock(recluster_sink.table_info.clone())?;
231+
let lock = if recluster_sink.need_lock {
232+
Some(LockManager::create_table_lock(
233+
recluster_sink.table_info.clone(),
234+
)?)
235+
} else {
236+
None
237+
};
232238
self.main_pipeline.add_sink(|input| {
233239
CommitSink::try_create(
234240
table,
@@ -238,7 +244,7 @@ impl PipelineBuilder {
238244
snapshot_gen.clone(),
239245
input,
240246
None,
241-
Some(lock.clone()),
247+
lock.clone(),
242248
None,
243249
None,
244250
)

src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,4 +34,5 @@ pub struct ReclusterSink {
3434
pub remained_blocks: Vec<Arc<BlockMeta>>,
3535
pub removed_segment_indexes: Vec<usize>,
3636
pub removed_segment_summary: Statistics,
37+
pub need_lock: bool,
3738
}

src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_catalog::plan::Partitions;
2626
use databend_common_catalog::plan::PartitionsShuffleKind;
2727
use databend_common_exception::ErrorCode;
2828
use databend_common_exception::Result;
29+
use databend_common_expression::is_stream_column_id;
2930
use databend_common_expression::BlockThresholds;
3031
use databend_common_expression::ColumnId;
3132
use databend_common_metrics::storage::*;
@@ -195,6 +196,7 @@ impl BlockCompactMutator {
195196
let cluster = self.ctx.get_cluster();
196197
let max_threads = self.ctx.get_settings().get_max_threads()? as usize;
197198
let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() * max_threads {
199+
// NOTE: The snapshot schema does not contain the stream column.
198200
let column_ids = self
199201
.compact_params
200202
.base_snapshot
@@ -471,7 +473,14 @@ impl CompactTaskBuilder {
471473
}
472474

473475
fn check_compact(&self, block: &Arc<BlockMeta>) -> bool {
474-
let column_ids: HashSet<ColumnId> = block.col_metas.keys().cloned().collect();
476+
// The snapshot schema does not contain stream columns,
477+
// so the stream columns need to be filtered out.
478+
let column_ids = block
479+
.col_metas
480+
.keys()
481+
.filter(|id| !is_stream_column_id(**id))
482+
.cloned()
483+
.collect::<HashSet<_>>();
475484
if self.column_ids == column_ids {
476485
// Check if the block needs to be resort.
477486
self.cluster_key_id.is_some_and(|key| {

tests/sqllogictests/suites/base/09_fuse_engine/09_0011_change_tracking.test

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ select a, _origin_version is null, _origin_block_id is null, _origin_block_row_n
8181
2 0 0 1 0
8282
3 0 0 1 1
8383
5 0 0 0 0
84-
6 0 0 0 0
85-
7 0 0 1 0
84+
6 1 1 NULL 0
85+
7 1 1 NULL 0
8686

8787
statement ok
8888
create table t2(a int)
@@ -178,7 +178,7 @@ merge into t3 using t4 on t3.a = t4.a when matched and t4.a = 4 then delete when
178178
1 1 1
179179

180180
query IIBBII
181-
select a, b, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t3
181+
select a, b, _origin_version is null, _origin_block_id is null, _origin_block_row_num, _row_version from t3 order by a
182182
----
183183
0 0 0 0 0 0
184184
1 1 0 0 0 0

0 commit comments

Comments
 (0)