Skip to content

Commit f6ba7e2

Browse files
committed
improve recluster partition
1 parent ce16f72 commit f6ba7e2

File tree

5 files changed

+243
-20
lines changed

5 files changed

+243
-20
lines changed

src/query/service/src/pipelines/builders/builder_recluster.rs

Lines changed: 9 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@ use databend_common_pipeline_transforms::sort::RowConverter;
4646
use databend_common_pipeline_transforms::sort::Rows;
4747
use databend_common_pipeline_transforms::sort::SimpleRowConverter;
4848
use databend_common_pipeline_transforms::sort::SimpleRowsAsc;
49-
use databend_common_pipeline_transforms::MemorySettings;
5049
use databend_common_sql::evaluator::CompoundBlockOperator;
5150
use databend_common_sql::executor::physical_plans::MutationKind;
5251
use databend_common_sql::executor::physical_plans::Recluster;
@@ -61,13 +60,12 @@ use match_template::match_template;
6160

6261
use crate::pipelines::builders::SortPipelineBuilder;
6362
use crate::pipelines::processors::transforms::ReclusterPartitionExchange;
64-
use crate::pipelines::processors::transforms::ReclusterPartitionStrategy;
6563
use crate::pipelines::processors::transforms::SampleState;
6664
use crate::pipelines::processors::transforms::TransformAddOrderColumn;
6765
use crate::pipelines::processors::transforms::TransformAddStreamColumns;
68-
use crate::pipelines::processors::transforms::TransformPartitionCollect;
6966
use crate::pipelines::processors::transforms::TransformRangePartitionIndexer;
7067
use crate::pipelines::processors::transforms::TransformReclusterCollect;
68+
use crate::pipelines::processors::transforms::TransformReclusterPartition;
7169
use crate::pipelines::PipelineBuilder;
7270

7371
impl PipelineBuilder {
@@ -209,23 +207,15 @@ impl PipelineBuilder {
209207
ReclusterPartitionExchange::create(0, partitions),
210208
);
211209
let processor_id = AtomicUsize::new(0);
212-
let settings = self.ctx.get_settings();
213-
let memory_settings = MemorySettings::disable_spill();
214210
self.main_pipeline.add_transform(|input, output| {
215-
Ok(ProcessorPtr::create(Box::new(
216-
TransformPartitionCollect::new(
217-
self.ctx.clone(),
218-
input,
219-
output,
220-
&settings,
221-
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
222-
num_processors,
223-
partitions,
224-
memory_settings.clone(),
225-
None,
226-
ReclusterPartitionStrategy::new(properties.clone()),
227-
)?,
228-
)))
211+
TransformReclusterPartition::try_create(
212+
input,
213+
output,
214+
properties.clone(),
215+
processor_id.fetch_add(1, atomic::Ordering::AcqRel),
216+
num_processors,
217+
partitions,
218+
)
229219
})?;
230220

231221
self.main_pipeline.add_async_accumulating_transformer(|| {

src/query/service/src/pipelines/processors/transforms/recluster/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod recluster_sample_state;
1919
mod transform_add_order_column;
2020
mod transform_range_partition_indexer;
2121
mod transform_recluster_collect;
22+
mod transform_recluster_partition;
2223

2324
pub use range_bound_sampler::RangeBoundSampler;
2425
pub use recluster_partition_exchange::ReclusterPartitionExchange;
@@ -29,3 +30,4 @@ pub use transform_add_order_column::TransformAddOrderColumn;
2930
pub use transform_range_partition_indexer::TransformRangePartitionIndexer;
3031
pub use transform_recluster_collect::ReclusterSampleMeta;
3132
pub use transform_recluster_collect::TransformReclusterCollect;
33+
pub use transform_recluster_partition::TransformReclusterPartition;
Lines changed: 230 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,230 @@
1+
// Copyright 2021 Datafuse Labs
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use std::any::Any;
16+
use std::collections::VecDeque;
17+
use std::sync::Arc;
18+
19+
use databend_common_exception::Result;
20+
use databend_common_expression::BlockMetaInfoDowncast;
21+
use databend_common_expression::DataBlock;
22+
use databend_common_pipeline_core::processors::Event;
23+
use databend_common_pipeline_core::processors::InputPort;
24+
use databend_common_pipeline_core::processors::OutputPort;
25+
use databend_common_pipeline_core::processors::Processor;
26+
use databend_common_pipeline_core::processors::ProcessorPtr;
27+
use databend_common_storages_fuse::io::StreamBlockBuilder;
28+
use databend_common_storages_fuse::io::StreamBlockProperties;
29+
30+
use crate::pipelines::processors::transforms::WindowPartitionMeta;
31+
32+
enum Step {
33+
Consume,
34+
Collect,
35+
Flush,
36+
}
37+
38+
struct PartitionData {
39+
builder: Option<StreamBlockBuilder>,
40+
data_blocks: Vec<DataBlock>,
41+
block_size: usize,
42+
block_rows: usize,
43+
}
44+
45+
impl PartitionData {
46+
fn new() -> Self {
47+
Self {
48+
builder: None,
49+
data_blocks: vec![],
50+
block_size: 0,
51+
block_rows: 0,
52+
}
53+
}
54+
55+
fn is_empty(&self) -> bool {
56+
self.builder.as_ref().is_none_or(|v| v.is_empty()) && self.data_blocks.is_empty()
57+
}
58+
}
59+
60+
pub struct TransformReclusterPartition {
61+
input: Arc<InputPort>,
62+
output: Arc<OutputPort>,
63+
64+
properties: Arc<StreamBlockProperties>,
65+
66+
// The partition id is used to map the partition id to the new partition id.
67+
partition_id: Vec<usize>,
68+
partition_data: Vec<PartitionData>,
69+
output_data: VecDeque<DataBlock>,
70+
71+
step: Step,
72+
}
73+
74+
impl TransformReclusterPartition {
75+
pub fn try_create(
76+
input: Arc<InputPort>,
77+
output: Arc<OutputPort>,
78+
properties: Arc<StreamBlockProperties>,
79+
processor_id: usize,
80+
num_processors: usize,
81+
num_partitions: usize,
82+
) -> Result<ProcessorPtr> {
83+
let partitions = (0..num_partitions)
84+
.filter(|&partition| (partition * num_processors) / num_partitions == processor_id)
85+
.collect::<Vec<_>>();
86+
let mut partition_id = vec![0; num_partitions];
87+
let mut partition_data = Vec::with_capacity(num_partitions);
88+
for (new_partition_id, partition) in partitions.iter().enumerate() {
89+
partition_id[*partition] = new_partition_id;
90+
partition_data.push(PartitionData::new());
91+
}
92+
Ok(ProcessorPtr::create(Box::new(
93+
TransformReclusterPartition {
94+
input,
95+
output,
96+
properties,
97+
partition_id,
98+
partition_data,
99+
output_data: VecDeque::new(),
100+
step: Step::Consume,
101+
},
102+
)))
103+
}
104+
}
105+
106+
impl Processor for TransformReclusterPartition {
107+
fn name(&self) -> String {
108+
"TransformReclusterPartition".to_string()
109+
}
110+
111+
fn as_any(&mut self) -> &mut dyn Any {
112+
self
113+
}
114+
115+
fn event(&mut self) -> Result<Event> {
116+
if matches!(self.step, Step::Collect | Step::Flush) {
117+
return Ok(Event::Sync);
118+
}
119+
120+
if self.output.is_finished() {
121+
self.input.finish();
122+
return Ok(Event::Finished);
123+
}
124+
125+
if !self.output.can_push() {
126+
return Ok(Event::NeedConsume);
127+
}
128+
129+
if let Some(data_block) = self.output_data.pop_front() {
130+
self.output.push_data(Ok(data_block));
131+
return Ok(Event::NeedConsume);
132+
}
133+
134+
if self.input.is_finished() {
135+
if !self.partition_data.is_empty() {
136+
self.step = Step::Flush;
137+
return Ok(Event::Sync);
138+
}
139+
self.output.finish();
140+
return Ok(Event::Finished);
141+
}
142+
143+
if self.input.has_data() {
144+
self.step = Step::Collect;
145+
return Ok(Event::Sync);
146+
}
147+
148+
self.input.set_need_data();
149+
Ok(Event::NeedData)
150+
}
151+
152+
fn process(&mut self) -> Result<()> {
153+
match std::mem::replace(&mut self.step, Step::Consume) {
154+
Step::Collect => {
155+
let data_block = self.input.pull_data().unwrap()?;
156+
if let Some(meta) = data_block
157+
.get_owned_meta()
158+
.and_then(WindowPartitionMeta::downcast_from)
159+
{
160+
for (partition_id, data_block) in meta.partitioned_data.into_iter() {
161+
if data_block.is_empty() {
162+
continue;
163+
}
164+
165+
let new_id = self.partition_id[partition_id];
166+
let partition_data =
167+
unsafe { self.partition_data.get_unchecked_mut(new_id) };
168+
if partition_data.builder.is_none() {
169+
partition_data.builder = Some(StreamBlockBuilder::try_new_with_config(
170+
self.properties.clone(),
171+
)?);
172+
}
173+
let builder = partition_data.builder.as_mut().unwrap();
174+
if !builder.need_flush() {
175+
builder.write(data_block)?;
176+
} else {
177+
partition_data.block_size += data_block.estimate_block_size();
178+
partition_data.block_rows += data_block.num_rows();
179+
partition_data.data_blocks.push(data_block);
180+
181+
if self.properties.check_large_enough(
182+
partition_data.block_rows,
183+
partition_data.block_size,
184+
) {
185+
let builder = partition_data.builder.take().unwrap();
186+
let serialized = builder.finish()?;
187+
self.output_data
188+
.push_back(DataBlock::empty_with_meta(Box::new(serialized)));
189+
190+
let mut builder = StreamBlockBuilder::try_new_with_config(
191+
self.properties.clone(),
192+
)?;
193+
for block in
194+
std::mem::take(&mut partition_data.data_blocks).into_iter()
195+
{
196+
builder.write(block)?;
197+
}
198+
partition_data.builder = Some(builder);
199+
partition_data.block_rows = 0;
200+
partition_data.block_size = 0;
201+
}
202+
}
203+
}
204+
}
205+
}
206+
Step::Flush => {
207+
while let Some(mut partition_data) = self.partition_data.pop() {
208+
if partition_data.is_empty() {
209+
continue;
210+
}
211+
212+
let mut builder = if partition_data.builder.is_none() {
213+
StreamBlockBuilder::try_new_with_config(self.properties.clone())?
214+
} else {
215+
partition_data.builder.take().unwrap()
216+
};
217+
for block in partition_data.data_blocks {
218+
builder.write(block)?;
219+
}
220+
let serialized = builder.finish()?;
221+
self.output_data
222+
.push_back(DataBlock::empty_with_meta(Box::new(serialized)));
223+
break;
224+
}
225+
}
226+
_ => unreachable!(),
227+
}
228+
Ok(())
229+
}
230+
}

src/query/storages/fuse/src/operations/common/processors/transform_block_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ impl Processor for TransformBlockBuilder {
116116
}
117117

118118
if self.output.is_finished() {
119+
self.input.finish();
119120
return Ok(Event::Finished);
120121
}
121122

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ impl ReclusterMutator {
206206
settings.get_max_memory_usage()? - GLOBAL_MEM_STAT.get_memory_usage() as u64;
207207
let memory_threshold = settings
208208
.get_recluster_block_size()?
209-
.min(avail_memory_usage * 40 / 100) as usize;
209+
.min(avail_memory_usage * 30 / 100) as usize;
210210
// specify a rather small value, so that `recluster_block_size` might be tuned to lower value.
211211
let mut max_blocks_per_task = (memory_threshold / self.average_size).max(2);
212212
let block_per_seg = self.block_thresholds.block_per_segment;

0 commit comments

Comments
 (0)