Skip to content

Commit c822dcc

Browse files
committed
add transform hilbert collect
1 parent a116b11 commit c822dcc

File tree

12 files changed

+458
-114
lines changed

12 files changed

+458
-114
lines changed

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,4 +166,53 @@ impl BlockThresholds {
166166
};
167167
total_rows.div_ceil(block_nums.max(1)).max(1)
168168
}
169+
170+
/// Calculates the optimal number of partitions (blocks) based on total data size and row count.
171+
///
172+
/// # Parameters
173+
/// - `total_rows`: The total number of rows in the data.
174+
/// - `total_bytes`: The total uncompressed size of the data in bytes.
175+
/// - `total_compressed`: The total compressed size of the data in bytes.
176+
///
177+
/// # Returns
178+
/// - The calculated number of partitions (blocks) needed.
179+
#[inline]
180+
pub fn calc_partitions_for_recluster(
181+
&self,
182+
total_rows: usize,
183+
total_bytes: usize,
184+
total_compressed: usize,
185+
) -> usize {
186+
// If the data is already compact enough, return a single partition.
187+
if self.check_for_compact(total_rows, total_bytes)
188+
&& total_compressed < 2 * self.min_compressed_per_block
189+
{
190+
return 1;
191+
}
192+
193+
// Estimate the number of blocks based on row count and compressed size.
194+
let by_rows = std::cmp::max(total_rows / self.max_rows_per_block, 1);
195+
let by_compressed = total_compressed / self.max_compressed_per_block;
196+
// If row-based block count is greater, use max rows per block as limit.
197+
if by_rows >= by_compressed {
198+
return by_rows;
199+
}
200+
201+
// Adjust block count based on byte size thresholds.
202+
let bytes_per_block = total_bytes.div_ceil(by_compressed);
203+
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
204+
let min_bytes = max_bytes / 2;
205+
let total_partitions = if bytes_per_block > max_bytes {
206+
// Block size is too large.
207+
total_bytes / max_bytes
208+
} else if bytes_per_block < min_bytes {
209+
// Block size is too small.
210+
total_bytes / min_bytes
211+
} else {
212+
// Block size is acceptable.
213+
by_compressed
214+
};
215+
216+
std::cmp::max(total_partitions, 1)
217+
}
169218
}

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,12 +323,15 @@ impl ReclusterTableInterpreter {
323323
let total_rows = recluster_info.removed_statistics.row_count as usize;
324324
let total_compressed = recluster_info.removed_statistics.compressed_byte_size as usize;
325325

326-
// Determine rows per block based on data size and compression ratio
327-
let rows_per_block =
328-
block_thresholds.calc_rows_for_recluster(total_rows, total_bytes, total_compressed);
329-
326+
// Determine rows per block based on data size and compression ratio,
330327
// Calculate initial partition count based on data volume and block size
331-
let total_partitions = std::cmp::max(total_rows / rows_per_block, 1);
328+
let total_partitions = block_thresholds.calc_partitions_for_recluster(
329+
total_rows,
330+
total_bytes,
331+
total_compressed,
332+
);
333+
let bytes_per_block = (total_bytes / total_partitions).max(1);
334+
let rows_per_block = (total_rows / total_partitions).max(1);
332335

333336
warn!(
334337
"Do hilbert recluster, total_bytes: {}, total_rows: {}, total_partitions: {}",
@@ -487,6 +490,7 @@ impl ReclusterTableInterpreter {
487490
range_start: 0,
488491
range_width: total_partitions,
489492
table_meta_timestamps,
493+
bytes_per_block,
490494
rows_per_block,
491495
}));
492496

src/query/service/src/pipelines/builders/builder_hilbert_partition.rs

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ use std::sync::atomic::AtomicUsize;
1818
use databend_common_catalog::table::Table;
1919
use databend_common_catalog::table_context::TableContext;
2020
use databend_common_exception::Result;
21-
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
2221
use databend_common_pipeline_core::processors::ProcessorPtr;
2322
use databend_common_pipeline_transforms::MemorySettings;
2423
use databend_common_sql::executor::physical_plans::HilbertPartition;
@@ -27,12 +26,12 @@ use databend_common_storages_fuse::operations::TransformBlockWriter;
2726
use databend_common_storages_fuse::operations::TransformSerializeBlock;
2827
use databend_common_storages_fuse::statistics::ClusterStatsGenerator;
2928
use databend_common_storages_fuse::FuseTable;
30-
use databend_common_storages_fuse::FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD;
3129
use databend_storages_common_cache::TempDirManager;
3230

3331
use crate::pipelines::memory_settings::MemorySettingsExt;
3432
use crate::pipelines::processors::transforms::CompactStrategy;
3533
use crate::pipelines::processors::transforms::HilbertPartitionExchange;
34+
use crate::pipelines::processors::transforms::TransformHilbertCollect;
3635
use crate::pipelines::processors::transforms::TransformWindowPartitionCollect;
3736
use crate::pipelines::PipelineBuilder;
3837
use crate::spillers::SpillerDiskConfig;
@@ -65,35 +64,25 @@ impl PipelineBuilder {
6564

6665
let window_spill_settings = MemorySettings::from_window_settings(&self.ctx)?;
6766
let processor_id = AtomicUsize::new(0);
68-
let max_bytes_per_block = std::cmp::min(
69-
4 * table.get_option(
70-
FUSE_OPT_KEY_BLOCK_IN_MEM_SIZE_THRESHOLD,
71-
DEFAULT_BLOCK_BUFFER_SIZE,
72-
),
73-
400 * 1024 * 1024,
74-
);
75-
self.main_pipeline.add_transform(|input, output| {
76-
Ok(ProcessorPtr::create(Box::new(
77-
TransformWindowPartitionCollect::new(
78-
self.ctx.clone(),
79-
input,
80-
output,
81-
&settings,
82-
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
83-
num_processors,
84-
partition.range_width,
85-
window_spill_settings.clone(),
86-
disk_spill.clone(),
87-
CompactStrategy::new(
88-
partition.rows_per_block,
89-
max_bytes_per_block,
90-
enable_stream_writer,
91-
),
92-
)?,
93-
)))
94-
})?;
9567

9668
if enable_stream_writer {
69+
self.main_pipeline.add_transform(|input, output| {
70+
Ok(ProcessorPtr::create(Box::new(
71+
TransformHilbertCollect::new(
72+
self.ctx.clone(),
73+
input,
74+
output,
75+
&settings,
76+
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
77+
num_processors,
78+
partition.range_width,
79+
window_spill_settings.clone(),
80+
disk_spill.clone(),
81+
partition.bytes_per_block,
82+
)?,
83+
)))
84+
})?;
85+
9786
self.main_pipeline.add_transform(|input, output| {
9887
TransformBlockWriter::try_create(
9988
self.ctx.clone(),
@@ -103,9 +92,27 @@ impl PipelineBuilder {
10392
table,
10493
partition.table_meta_timestamps,
10594
false,
95+
Some(partition.bytes_per_block),
10696
)
10797
})
10898
} else {
99+
self.main_pipeline.add_transform(|input, output| {
100+
Ok(ProcessorPtr::create(Box::new(
101+
TransformWindowPartitionCollect::new(
102+
self.ctx.clone(),
103+
input,
104+
output,
105+
&settings,
106+
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
107+
num_processors,
108+
partition.range_width,
109+
window_spill_settings.clone(),
110+
disk_spill.clone(),
111+
CompactStrategy::new(partition.rows_per_block, partition.bytes_per_block),
112+
)?,
113+
)))
114+
})?;
115+
109116
self.main_pipeline
110117
.add_transform(|transform_input_port, transform_output_port| {
111118
let proc = TransformSerializeBlock::try_create(

src/query/service/src/pipelines/processors/transforms/window/partition/data_processor_strategy.rs

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,19 +27,13 @@ pub trait DataProcessorStrategy: Send + Sync + 'static {
2727
pub struct CompactStrategy {
2828
max_bytes_per_block: usize,
2929
max_rows_per_block: usize,
30-
enable_stream_writer: bool,
3130
}
3231

3332
impl CompactStrategy {
34-
pub fn new(
35-
max_rows_per_block: usize,
36-
max_bytes_per_block: usize,
37-
enable_stream_writer: bool,
38-
) -> Self {
33+
pub fn new(max_rows_per_block: usize, max_bytes_per_block: usize) -> Self {
3934
Self {
4035
max_bytes_per_block,
4136
max_rows_per_block,
42-
enable_stream_writer,
4337
}
4438
}
4539

@@ -56,10 +50,6 @@ impl DataProcessorStrategy for CompactStrategy {
5650
const NAME: &'static str = "Compact";
5751

5852
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
59-
if self.enable_stream_writer {
60-
return Ok(data_blocks);
61-
}
62-
6353
let blocks_num = data_blocks.len();
6454
if blocks_num < 2 {
6555
return Ok(data_blocks);

src/query/service/src/pipelines/processors/transforms/window/partition/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
mod data_processor_strategy;
1616
mod hilbert_partition_exchange;
17+
mod transform_hilbert_collect;
1718
mod transform_window_partition_collect;
1819
mod window_partition_buffer;
1920
mod window_partition_exchange;
@@ -22,6 +23,7 @@ mod window_partition_partial_top_n_exchange;
2223

2324
pub use data_processor_strategy::*;
2425
pub use hilbert_partition_exchange::*;
26+
pub use transform_hilbert_collect::*;
2527
pub use transform_window_partition_collect::*;
2628
pub use window_partition_buffer::*;
2729
pub use window_partition_exchange::*;

0 commit comments

Comments
 (0)