Skip to content

Commit 6643b25

Browse files
committed
hilbert recluster support block stream write
1 parent ebeab93 commit 6643b25

File tree

9 files changed

+171
-67
lines changed

9 files changed

+171
-67
lines changed

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,8 +152,8 @@ impl BlockThresholds {
152152

153153
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154154
// Adjust the number of blocks based on block size thresholds.
155-
let max_bytes_per_block = (4 * self.min_bytes_per_block).min(400 * 1024 * 1024);
156-
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
155+
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
156+
let min_bytes_per_block = self.min_bytes_per_block.min(100 * 1024 * 1024);
157157
let block_nums = if bytes_per_block > max_bytes_per_block {
158158
// Case 1: If the block size is too bigger.
159159
total_bytes.div_ceil(max_bytes_per_block)

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 35 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_catalog::table::TableExt;
2828
use databend_common_exception::ErrorCode;
2929
use databend_common_exception::Result;
3030
use databend_common_expression::type_check::check_function;
31+
use databend_common_expression::types::NumberScalar;
3132
use databend_common_expression::DataBlock;
3233
use databend_common_expression::Scalar;
3334
use databend_common_functions::BUILTIN_FUNCTIONS;
@@ -52,6 +53,8 @@ use databend_common_sql::plans::plan_hilbert_sql;
5253
use databend_common_sql::plans::replace_with_constant;
5354
use databend_common_sql::plans::set_update_stream_columns;
5455
use databend_common_sql::plans::BoundColumnRef;
56+
use databend_common_sql::plans::ConstantExpr;
57+
use databend_common_sql::plans::FunctionCall;
5558
use databend_common_sql::plans::Plan;
5659
use databend_common_sql::plans::ReclusterPlan;
5760
use databend_common_sql::IdentifierNormalizer;
@@ -325,19 +328,7 @@ impl ReclusterTableInterpreter {
325328
block_thresholds.calc_rows_for_recluster(total_rows, total_bytes, total_compressed);
326329

327330
// Calculate initial partition count based on data volume and block size
328-
let mut total_partitions = std::cmp::max(total_rows / rows_per_block, 1);
329-
330-
// Adjust number of partitions according to the block size thresholds
331-
if total_partitions < block_thresholds.block_per_segment
332-
&& block_thresholds.check_perfect_segment(
333-
block_thresholds.block_per_segment, // this effectively by-pass the total_blocks criteria
334-
total_rows,
335-
total_bytes,
336-
total_compressed,
337-
)
338-
{
339-
total_partitions = block_thresholds.block_per_segment;
340-
}
331+
let total_partitions = std::cmp::max(total_rows / rows_per_block, 1);
341332

342333
warn!(
343334
"Do hilbert recluster, total_bytes: {}, total_rows: {}, total_partitions: {}",
@@ -439,15 +430,37 @@ impl ReclusterTableInterpreter {
439430

440431
// For distributed execution, add an exchange operator to distribute work
441432
if is_distributed {
433+
let nodes_num = cluster.nodes.len() as u64;
434+
let scalar_expr = ScalarExpr::FunctionCall(FunctionCall {
435+
span: None,
436+
func_name: "div".to_string(),
437+
params: vec![],
438+
arguments: vec![
439+
ScalarExpr::FunctionCall(FunctionCall {
440+
span: None,
441+
func_name: "multiply".to_string(),
442+
params: vec![],
443+
arguments: vec![
444+
ScalarExpr::BoundColumnRef(BoundColumnRef {
445+
span: None,
446+
column: bind_context.columns.last().unwrap().clone(),
447+
}),
448+
ScalarExpr::ConstantExpr(ConstantExpr {
449+
span: None,
450+
value: Scalar::Number(NumberScalar::UInt64(nodes_num)),
451+
}),
452+
],
453+
}),
454+
ScalarExpr::ConstantExpr(ConstantExpr {
455+
span: None,
456+
value: Scalar::Number(NumberScalar::UInt64(total_partitions as u64)),
457+
}),
458+
],
459+
});
460+
442461
// Create an expression for the partition column,
443462
// i.e.`range_partition_id(hilbert_range_index({hilbert_keys_str}), [...]) AS _predicate`
444-
let expr = scalar_expr_to_remote_expr(
445-
&ScalarExpr::BoundColumnRef(BoundColumnRef {
446-
span: None,
447-
column: bind_context.columns.last().unwrap().clone(),
448-
}),
449-
plan.output_schema()?.as_ref(),
450-
)?;
463+
let expr = scalar_expr_to_remote_expr(&scalar_expr, plan.output_schema()?.as_ref())?;
451464

452465
// Add exchange operator for data distribution,
453466
// shuffling data based on the hash of range partition IDs derived from the Hilbert index.
@@ -471,7 +484,8 @@ impl ReclusterTableInterpreter {
471484
plan_id: 0,
472485
input: plan,
473486
table_info: table_info.clone(),
474-
num_partitions: total_partitions,
487+
range_start: 0,
488+
range_width: total_partitions,
475489
table_meta_timestamps,
476490
rows_per_block,
477491
}));

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@
1515
use std::sync::atomic;
1616
use std::sync::atomic::AtomicUsize;
1717

18+
use databend_common_catalog::table::Table;
1819
use databend_common_catalog::table_context::TableContext;
1920
use databend_common_exception::Result;
2021
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
2122
use databend_common_pipeline_core::processors::ProcessorPtr;
2223
use databend_common_pipeline_transforms::MemorySettings;
2324
use databend_common_sql::executor::physical_plans::HilbertPartition;
2425
use databend_common_sql::executor::physical_plans::MutationKind;
26+
use databend_common_storages_fuse::operations::TransformBlockWriter;
2527
use databend_common_storages_fuse::operations::TransformSerializeBlock;
2628
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2729
use databend_common_storages_fuse::FuseTable;
@@ -43,10 +45,12 @@ impl PipelineBuilder {
4345
.ctx
4446
.build_table_by_table_info(&partition.table_info, None)?;
4547
let table = FuseTable::try_from_table(table.as_ref())?;
48+
let enable_stream_writer = self.ctx.get_settings().get_enable_block_stream_write()?
49+
&& table.storage_format_as_parquet();
4650

4751
self.main_pipeline.exchange(
4852
num_processors,
49-
HilbertPartitionExchange::create(partition.num_partitions),
53+
HilbertPartitionExchange::create(partition.range_start, partition.range_width),
5054
);
5155

5256
let settings = self.ctx.get_settings();
@@ -77,26 +81,43 @@ impl PipelineBuilder {
7781
&settings,
7882
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
7983
num_processors,
80-
partition.num_partitions,
84+
partition.range_width,
8185
window_spill_settings.clone(),
8286
disk_spill.clone(),
83-
CompactStrategy::new(partition.rows_per_block, max_bytes_per_block),
87+
CompactStrategy::new(
88+
partition.rows_per_block,
89+
max_bytes_per_block,
90+
enable_stream_writer,
91+
),
8492
)?,
8593
)))
8694
})?;
8795

88-
self.main_pipeline
89-
.add_transform(|transform_input_port, transform_output_port| {
90-
let proc = TransformSerializeBlock::try_create(
96+
if enable_stream_writer {
97+
self.main_pipeline.add_transform(|input, output| {
98+
TransformBlockWriter::try_create(
9199
self.ctx.clone(),
92-
transform_input_port,
93-
transform_output_port,
100+
input,
101+
output,
94102
table,
95-
ClusterStatsGenerator::default(),
96-
MutationKind::Recluster,
97103
partition.table_meta_timestamps,
98-
)?;
99-
proc.into_processor()
104+
false,
105+
)
100106
})
107+
} else {
108+
self.main_pipeline
109+
.add_transform(|transform_input_port, transform_output_port| {
110+
let proc = TransformSerializeBlock::try_create(
111+
self.ctx.clone(),
112+
transform_input_port,
113+
transform_output_port,
114+
table,
115+
ClusterStatsGenerator::default(),
116+
MutationKind::Recluster,
117+
partition.table_meta_timestamps,
118+
)?;
119+
proc.into_processor()
120+
})
121+
}
101122
}
102123
}

src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,19 @@ pub trait DataProcessorStrategy: Send + Sync + 'static {
2727
pub struct CompactStrategy {
2828
max_bytes_per_block: usize,
2929
max_rows_per_block: usize,
30+
enable_stream_writer: bool,
3031
}
3132

3233
impl CompactStrategy {
33-
pub fn new(max_rows_per_block: usize, max_bytes_per_block: usize) -> Self {
34+
pub fn new(
35+
max_rows_per_block: usize,
36+
max_bytes_per_block: usize,
37+
enable_stream_writer: bool,
38+
) -> Self {
3439
Self {
3540
max_bytes_per_block,
3641
max_rows_per_block,
42+
enable_stream_writer,
3743
}
3844
}
3945

@@ -50,6 +56,10 @@ impl DataProcessorStrategy for CompactStrategy {
5056
const NAME: &'static str = "Compact";
5157

5258
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
59+
if self.enable_stream_writer {
60+
return Ok(data_blocks);
61+
}
62+
5363
let blocks_num = data_blocks.len();
5464
if blocks_num < 2 {
5565
return Ok(data_blocks);

src/query/service/src/pipelines/processors/transforms/window/partition/hilbert_partition_exchange.rs

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,13 @@ use databend_common_pipeline_core::processors::Exchange;
2525
use crate::pipelines::processors::transforms::WindowPartitionMeta;
2626

2727
pub struct HilbertPartitionExchange {
28-
num_partitions: usize,
28+
start: u64,
29+
width: usize,
2930
}
3031

3132
impl HilbertPartitionExchange {
32-
pub fn create(num_partitions: usize) -> Arc<HilbertPartitionExchange> {
33-
Arc::new(HilbertPartitionExchange { num_partitions })
33+
pub fn create(start: u64, width: usize) -> Arc<HilbertPartitionExchange> {
34+
Arc::new(HilbertPartitionExchange { start, width })
3435
}
3536
}
3637

@@ -48,20 +49,25 @@ impl Exchange for HilbertPartitionExchange {
4849
// Scatter the data block to different partitions.
4950
let indices = range_ids
5051
.iter()
51-
.map(|&id| (id % self.num_partitions as u64) as u16)
52+
.map(|&id| (id - self.start) as u16)
5253
.collect::<Vec<_>>();
5354
data_block.pop_columns(1);
54-
let scatter_indices =
55-
DataBlock::divide_indices_by_scatter_size(&indices, self.num_partitions);
55+
56+
let scatter_indices = DataBlock::divide_indices_by_scatter_size(&indices, self.width);
5657
// Partition the data blocks to different processors.
58+
let base = self.width / n;
59+
let remainder = self.width % n;
5760
let mut output_data_blocks = vec![vec![]; n];
58-
for (partition_id, indices) in scatter_indices.iter().take(self.num_partitions).enumerate()
59-
{
60-
if indices.is_empty() {
61-
continue;
61+
for (partition_id, indices) in scatter_indices.into_iter().take(self.width).enumerate() {
62+
if !indices.is_empty() {
63+
let target = if partition_id < remainder * (base + 1) {
64+
partition_id / (base + 1)
65+
} else {
66+
(partition_id - remainder) / base
67+
};
68+
let block = data_block.take_with_optimize_size(&indices)?;
69+
output_data_blocks[target].push((partition_id, block));
6270
}
63-
let block = data_block.take_with_optimize_size(indices)?;
64-
output_data_blocks[partition_id % n].push((partition_id, block));
6571
}
6672

6773
// Union data blocks for each processor.

src/query/service/src/schedulers/fragments/fragmenter.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_sql::executor::physical_plans::ExchangeSink;
2626
use databend_common_sql::executor::physical_plans::ExchangeSource;
2727
use databend_common_sql::executor::physical_plans::FragmentKind;
2828
use databend_common_sql::executor::physical_plans::HashJoin;
29+
use databend_common_sql::executor::physical_plans::HilbertPartition;
2930
use databend_common_sql::executor::physical_plans::MutationSource;
3031
use databend_common_sql::executor::physical_plans::Recluster;
3132
use databend_common_sql::executor::physical_plans::ReplaceInto;
@@ -41,7 +42,6 @@ use crate::servers::flight::v1::exchange::DataExchange;
4142
use crate::servers::flight::v1::exchange::MergeExchange;
4243
use crate::servers::flight::v1::exchange::ShuffleDataExchange;
4344
use crate::sessions::QueryContext;
44-
use crate::sql::executor::physical_plans::Mutation;
4545
use crate::sql::executor::PhysicalPlan;
4646

4747
/// Visitor to split a `PhysicalPlan` into fragments.
@@ -67,6 +67,7 @@ enum State {
6767
Compact,
6868
Recluster,
6969
Other,
70+
HilbertRecluster,
7071
}
7172

7273
impl Fragmenter {
@@ -170,14 +171,6 @@ impl PhysicalPlanReplacer for Fragmenter {
170171
Ok(PhysicalPlan::MutationSource(plan.clone()))
171172
}
172173

173-
fn replace_mutation(&mut self, plan: &Mutation) -> Result<PhysicalPlan> {
174-
let input = self.replace(&plan.input)?;
175-
Ok(PhysicalPlan::Mutation(Box::new(Mutation {
176-
input: Box::new(input),
177-
..plan.clone()
178-
})))
179-
}
180-
181174
fn replace_replace_into(&mut self, plan: &ReplaceInto) -> Result<PhysicalPlan> {
182175
let input = self.replace(&plan.input)?;
183176
self.state = State::ReplaceInto;
@@ -209,6 +202,11 @@ impl PhysicalPlanReplacer for Fragmenter {
209202
Ok(PhysicalPlan::Recluster(Box::new(plan.clone())))
210203
}
211204

205+
fn replace_hilbert_serialize(&mut self, plan: &HilbertPartition) -> Result<PhysicalPlan> {
206+
self.state = State::HilbertRecluster;
207+
Ok(PhysicalPlan::HilbertPartition(Box::new(plan.clone())))
208+
}
209+
212210
fn replace_compact_source(&mut self, plan: &CompactSource) -> Result<PhysicalPlan> {
213211
self.state = State::Compact;
214212
Ok(PhysicalPlan::CompactSource(Box::new(plan.clone())))
@@ -310,6 +308,7 @@ impl PhysicalPlanReplacer for Fragmenter {
310308
State::ReplaceInto => FragmentType::ReplaceInto,
311309
State::Compact => FragmentType::Compact,
312310
State::Recluster => FragmentType::Recluster,
311+
State::HilbertRecluster => FragmentType::HilbertRecluster,
313312
};
314313
self.state = State::Other;
315314
let exchange = Self::get_exchange(self.ctx.clone(), &plan)?;

0 commit comments

Comments
 (0)