Skip to content

Commit ce16f72

Browse files
committed
fix test
1 parent 8a4cdce commit ce16f72

File tree

15 files changed

+88
-84
lines changed

15 files changed

+88
-84
lines changed

src/query/expression/src/utils/block_thresholds.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ impl Default for BlockThresholds {
3939
max_bytes_per_block: DEFAULT_BLOCK_BUFFER_SIZE * 2,
4040
min_bytes_per_block: (DEFAULT_BLOCK_BUFFER_SIZE * 4).div_ceil(5),
4141
max_compressed_per_block: DEFAULT_BLOCK_COMPRESSED_SIZE,
42-
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 4).div_ceil(5),
42+
min_compressed_per_block: (DEFAULT_BLOCK_COMPRESSED_SIZE * 3).div_ceil(5),
4343
block_per_segment: DEFAULT_BLOCK_PER_SEGMENT,
4444
}
4545
}
@@ -58,7 +58,7 @@ impl BlockThresholds {
5858
max_bytes_per_block: bytes_per_block * 2,
5959
min_bytes_per_block: (bytes_per_block * 4).div_ceil(5),
6060
max_compressed_per_block,
61-
min_compressed_per_block: (max_compressed_per_block * 4).div_ceil(5),
61+
min_compressed_per_block: (max_compressed_per_block * 3).div_ceil(5),
6262
block_per_segment,
6363
}
6464
}
@@ -153,7 +153,7 @@ impl BlockThresholds {
153153
let bytes_per_block = total_bytes.div_ceil(block_num_by_compressed);
154154
// Adjust the number of blocks based on block size thresholds.
155155
let max_bytes_per_block = self.max_bytes_per_block.min(400 * 1024 * 1024);
156-
let min_bytes_per_block = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
156+
let min_bytes_per_block = self.min_bytes_per_block.min(100 * 1024 * 1024);
157157
let block_nums = if bytes_per_block > max_bytes_per_block {
158158
// Case 1: If the block size is too bigger.
159159
total_bytes.div_ceil(max_bytes_per_block)
@@ -201,7 +201,7 @@ impl BlockThresholds {
201201
// Adjust block count based on byte size thresholds.
202202
let bytes_per_block = total_bytes.div_ceil(by_compressed);
203203
let max_bytes = self.max_bytes_per_block.min(400 * 1024 * 1024);
204-
let min_bytes = (self.min_bytes_per_block / 2).min(50 * 1024 * 1024);
204+
let min_bytes = self.min_bytes_per_block.min(100 * 1024 * 1024);
205205
let total_partitions = if bytes_per_block > max_bytes {
206206
// Block size is too large.
207207
total_bytes / max_bytes

src/query/expression/tests/it/block_thresholds.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,8 @@ fn test_calc_rows_for_recluster() {
105105
assert_eq!(result, 300);
106106

107107
// Case 2: If the block size is too smaller.
108-
let result = t.calc_rows_for_recluster(4_000, 2_000_000, 600_000);
109-
assert_eq!(result, 800);
108+
let result = t.calc_rows_for_recluster(4_000, 1_600_000, 600_000);
109+
assert_eq!(result, 2000);
110110

111111
// Case 3: use the compressed-based block count.
112112
let result = t.calc_rows_for_recluster(4_000, 10_000_000, 600_000);
@@ -131,7 +131,7 @@ fn test_calc_partitions_for_recluster() {
131131
assert_eq!(result, 15);
132132

133133
// Case 2: If the block size is too smaller.
134-
let result = t.calc_partitions_for_recluster(4_000, 800_000, 800_000);
134+
let result = t.calc_partitions_for_recluster(4_000, 1_600_000, 800_000);
135135
assert_eq!(result, 2);
136136

137137
// Case 3: use the compressed-based block count.

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -631,7 +631,7 @@ impl ReclusterTableInterpreter {
631631
let database = &self.plan.database;
632632
let table = &self.plan.table;
633633
let settings = self.ctx.get_settings();
634-
let sample_size = settings.get_hilbert_sample_size_per_block()?;
634+
let sample_size = settings.get_recluster_sample_size_per_block()?;
635635

636636
let name_resolution_ctx = NameResolutionContext::try_from(settings.as_ref())?;
637637
let ast_exprs = tbl.resolve_cluster_keys(self.ctx.clone()).unwrap();

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -187,15 +187,19 @@ impl PipelineBuilder {
187187
.collect();
188188

189189
let num_processors = self.main_pipeline.output_len();
190-
let sample_rate = 0.01;
190+
let sample_size = self
191+
.ctx
192+
.get_settings()
193+
.get_recluster_sample_size_per_block()?
194+
as usize;
191195
let partitions = block_thresholds.calc_partitions_for_recluster(
192196
task.total_rows,
193197
task.total_bytes,
194198
task.total_compressed,
195199
);
196200
let state = SampleState::new(num_processors, partitions);
197201
let recluster_pipeline_builder =
198-
ReclusterPipelineBuilder::create(schema, sort_descs.into(), sample_rate)
202+
ReclusterPipelineBuilder::create(schema, sort_descs.into(), sample_size)
199203
.with_state(state);
200204
recluster_pipeline_builder
201205
.build_recluster_sample_pipeline(&mut self.main_pipeline)?;
@@ -314,21 +318,21 @@ struct ReclusterPipelineBuilder {
314318
schema: DataSchemaRef,
315319
sort_desc: Arc<[SortColumnDescription]>,
316320
state: Option<Arc<SampleState>>,
317-
sample_rate: f64,
321+
sample_size: usize,
318322
seed: u64,
319323
}
320324

321325
impl ReclusterPipelineBuilder {
322326
fn create(
323327
schema: DataSchemaRef,
324328
sort_desc: Arc<[SortColumnDescription]>,
325-
sample_rate: f64,
329+
sample_size: usize,
326330
) -> Self {
327331
Self {
328332
schema,
329333
sort_desc,
330334
state: None,
331-
sample_rate,
335+
sample_size,
332336
seed: rand::random(),
333337
}
334338
}
@@ -382,7 +386,7 @@ impl ReclusterPipelineBuilder {
382386
})?;
383387
let offset = self.schema.num_fields();
384388
pipeline.add_accumulating_transformer(|| {
385-
TransformReclusterCollect::<R::Type>::new(offset, self.sample_rate, self.seed)
389+
TransformReclusterCollect::<R::Type>::new(offset, self.sample_size, self.seed)
386390
});
387391
pipeline.add_transform(|input, output| {
388392
Ok(ProcessorPtr::create(TransformRangePartitionIndexer::<

src/query/service/src/pipelines/processors/transforms/recluster/range_bound_sampler.rs

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ pub struct RangeBoundSampler<T>
2626
where T: ValueType
2727
{
2828
offset: usize,
29-
sample_rate: f64,
29+
sample_size: usize,
3030
rng: SmallRng,
3131

3232
values: Vec<(u64, Vec<Scalar>)>,
@@ -36,11 +36,11 @@ where T: ValueType
3636
impl<T> RangeBoundSampler<T>
3737
where T: ValueType
3838
{
39-
pub fn new(offset: usize, sample_rate: f64, seed: u64) -> Self {
39+
pub fn new(offset: usize, sample_size: usize, seed: u64) -> Self {
4040
let rng = SmallRng::seed_from_u64(seed);
4141
Self {
4242
offset,
43-
sample_rate,
43+
sample_size,
4444
rng,
4545
values: vec![],
4646
_t: PhantomData,
@@ -58,15 +58,10 @@ where
5858
assert!(rows > 0);
5959
let column = data.get_by_offset(self.offset).to_column(rows);
6060

61-
let sample_size = std::cmp::max((self.sample_rate * rows as f64).ceil() as usize, 100);
61+
let sample_size = std::cmp::min(self.sample_size, rows);
6262
let mut indices = (0..rows).collect::<Vec<_>>();
63-
64-
let sampled_indices = if rows > sample_size {
65-
indices.shuffle(&mut self.rng);
66-
&indices[..sample_size]
67-
} else {
68-
&indices
69-
};
63+
indices.shuffle(&mut self.rng);
64+
let sampled_indices = &indices[..sample_size];
7065

7166
let column = T::try_downcast_column(&column).unwrap();
7267
let sample_values = sampled_indices

src/query/service/src/pipelines/processors/transforms/recluster/recluster_partition_strategy.rs

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ impl ReclusterPartitionStrategy {
3232
pub fn new(properties: Arc<StreamBlockProperties>) -> Self {
3333
Self { properties }
3434
}
35+
36+
fn concat_blocks(blocks: Vec<DataBlock>) -> Result<DataBlock> {
37+
DataBlock::concat(&blocks)
38+
}
3539
}
3640

3741
impl PartitionProcessStrategy for ReclusterPartitionStrategy {
@@ -51,22 +55,44 @@ impl PartitionProcessStrategy for ReclusterPartitionStrategy {
5155
/// Stream write each block, and flush it conditionally based on builder status
5256
/// and input size estimation.
5357
fn process_data_blocks(&self, data_blocks: Vec<DataBlock>) -> Result<Vec<DataBlock>> {
54-
let mut input_sizes: usize = data_blocks.iter().map(|b| b.estimate_block_size()).sum();
55-
let mut input_rows: usize = data_blocks.iter().map(|b| b.num_rows()).sum();
58+
let blocks_num = data_blocks.len();
59+
let mut accumulated_rows = 0;
60+
let mut accumulated_bytes = 0;
61+
let mut pending_blocks = Vec::with_capacity(blocks_num);
62+
let mut staged_blocks = Vec::with_capacity(blocks_num);
63+
let mut compacted = Vec::with_capacity(blocks_num);
64+
for block in data_blocks {
65+
accumulated_rows += block.num_rows();
66+
accumulated_bytes += block.estimate_block_size();
67+
pending_blocks.push(block);
68+
if !self
69+
.properties
70+
.check_large_enough(accumulated_rows, accumulated_bytes)
71+
{
72+
continue;
73+
}
74+
if !staged_blocks.is_empty() {
75+
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
76+
}
77+
std::mem::swap(&mut staged_blocks, &mut pending_blocks);
78+
accumulated_rows = 0;
79+
accumulated_bytes = 0;
80+
}
81+
staged_blocks.append(&mut pending_blocks);
82+
if !staged_blocks.is_empty() {
83+
compacted.push(Self::concat_blocks(std::mem::take(&mut staged_blocks))?);
84+
}
5685

5786
let mut result = Vec::new();
5887
let mut builder = StreamBlockBuilder::try_new_with_config(self.properties.clone())?;
59-
for block in data_blocks {
60-
input_sizes -= block.estimate_block_size();
61-
input_rows -= block.num_rows();
88+
for block in compacted {
6289
builder.write(block)?;
63-
if builder.need_flush() && self.properties.check_large_enough(input_rows, input_sizes) {
90+
if builder.need_flush() {
6491
let serialized = builder.finish()?;
6592
result.push(DataBlock::empty_with_meta(Box::new(serialized)));
6693
builder = StreamBlockBuilder::try_new_with_config(self.properties.clone())?;
6794
}
6895
}
69-
7096
if !builder.is_empty() {
7197
let serialized = builder.finish()?;
7298
result.push(DataBlock::empty_with_meta(Box::new(serialized)));

src/query/service/src/pipelines/processors/transforms/recluster/transform_recluster_collect.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@ where
3636
T: ArgType + Send + Sync,
3737
T::Scalar: Ord + Send,
3838
{
39-
pub fn new(offset: usize, sample_rate: f64, seed: u64) -> Self {
39+
pub fn new(offset: usize, sample_size: usize, seed: u64) -> Self {
4040
Self {
4141
input_data: vec![],
42-
sampler: RangeBoundSampler::<T>::new(offset, sample_rate, seed),
42+
sampler: RangeBoundSampler::<T>::new(offset, sample_size, seed),
4343
}
4444
}
4545
}

src/query/settings/src/settings_default.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -755,7 +755,7 @@ impl DefaultSettings {
755755
range: Some(SettingRange::Numeric(0..=1)),
756756
}),
757757
("enable_distributed_compact", DefaultSettingValue {
758-
value: UserSettingValue::UInt64(0),
758+
value: UserSettingValue::UInt64(1),
759759
desc: "Enables distributed execution of table compaction.",
760760
mode: SettingMode::Both,
761761
scope: SettingScope::Both,
@@ -870,7 +870,7 @@ impl DefaultSettings {
870870
range: Some(SettingRange::Numeric(2..=u64::MAX)),
871871
}),
872872
("enable_distributed_recluster", DefaultSettingValue {
873-
value: UserSettingValue::UInt64(0),
873+
value: UserSettingValue::UInt64(1),
874874
desc: "Enable distributed execution of table recluster.",
875875
mode: SettingMode::Both,
876876
scope: SettingScope::Both,
@@ -1220,9 +1220,9 @@ impl DefaultSettings {
12201220
scope: SettingScope::Both,
12211221
range: Some(SettingRange::Numeric(1..=65535)),
12221222
}),
1223-
("hilbert_sample_size_per_block", DefaultSettingValue {
1223+
("recluster_sample_size_per_block", DefaultSettingValue {
12241224
value: UserSettingValue::UInt64(1000),
1225-
desc: "Specifies the number of sample points per block used in Hilbert clustering.",
1225+
desc: "Specifies the number of sample points per block used in clustering.",
12261226
mode: SettingMode::Both,
12271227
scope: SettingScope::Both,
12281228
range: Some(SettingRange::Numeric(1..=u64::MAX)),

src/query/settings/src/settings_getter_setter.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -889,8 +889,8 @@ impl Settings {
889889
self.try_get_u64("hilbert_num_range_ids")
890890
}
891891

892-
pub fn get_hilbert_sample_size_per_block(&self) -> Result<u64> {
893-
self.try_get_u64("hilbert_sample_size_per_block")
892+
pub fn get_recluster_sample_size_per_block(&self) -> Result<u64> {
893+
self.try_get_u64("recluster_sample_size_per_block")
894894
}
895895

896896
pub fn get_hilbert_clustering_min_bytes(&self) -> Result<u64> {

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,9 @@ impl StreamBlockBuilder {
238238
pub fn need_flush(&self) -> bool {
239239
let file_size = self.block_writer.compressed_size();
240240
self.row_count >= self.properties.block_thresholds.min_rows_per_block
241-
|| self.block_size >= self.properties.block_thresholds.max_bytes_per_block
241+
|| self.block_size >= self.properties.block_thresholds.min_bytes_per_block * 2
242242
|| (file_size >= self.properties.block_thresholds.min_compressed_per_block
243-
&& self.block_size >= self.properties.block_thresholds.min_bytes_per_block / 2)
243+
&& self.block_size >= self.properties.block_thresholds.min_bytes_per_block)
244244
}
245245

246246
pub fn write(&mut self, block: DataBlock) -> Result<()> {

0 commit comments

Comments
 (0)